0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationBalance.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "LoadBalancer.h"
25 #include "OperationBalance.h"
26 #include "OperationProcessor.h"
27 #include "Utility.h"
28 #include "BalancePlanAuthority.h"
29 
30 #include <Hypertable/Lib/Key.h>
33 
35 
36 #include <Common/Error.h>
37 #include <Common/FailureInducer.h>
38 #include <Common/ScopeGuard.h>
39 #include <Common/Serialization.h>
40 #include <Common/StringExt.h>
41 #include <Common/System.h>
42 #include <Common/md5.h>
43 
44 #include <chrono>
45 #include <sstream>
46 #include <thread>
47 
48 using namespace Hypertable;
49 using namespace Hyperspace;
50 using namespace std;
51 
53  : Operation(context, MetaLog::EntityType::OPERATION_BALANCE) {
55  m_plan = make_shared<BalancePlan>();
56 }
57 
59  const MetaLog::EntityHeader &header_)
60  : Operation(context, header_) {
61  m_hash_code = md5_hash("OperationBalance");
62 }
63 
65  : Operation(context, event, MetaLog::EntityType::OPERATION_BALANCE) {
67  const uint8_t *ptr = event->payload;
68  size_t remaining = event->payload_len;
69  m_params.decode(&ptr, &remaining);
70  m_plan = make_shared<BalancePlan>(m_params.plan());
71 }
72 
74  m_hash_code = md5_hash("OperationBalance");
75  m_dependencies.clear();
82 }
83 
84 
86  int32_t state = get_state();
87 
88  HT_INFOF("Entering Balance-%lld algorithm= %s state=%s",
89  (Lld)header.id, m_plan->algorithm.c_str(),
91 
92  switch (state) {
93 
95 
96  try {
97  std::vector<RangeServerConnectionPtr> balanced;
98  std::vector<MetaLog::EntityPtr> entities;
99  int generation = m_context->get_balance_plan_authority()->get_generation();
100 
101  if (m_plan->empty())
102  m_context->balancer->create_plan(m_plan, balanced);
103 
105  entities.push_back(shared_from_this());
106  for (auto & rsc : balanced) {
107  rsc->set_balanced();
108  entities.push_back(rsc);
109  }
110 
111  if (!m_context->get_balance_plan_authority()->register_balance_plan(m_plan, generation, entities))
112  return;
113 
114  // Un-pause the balancer
115  m_context->balancer->unpause();
116  }
117  catch (Exception &e) {
118  complete_error(e);
119  HT_ERROR_OUT << e << HT_END;
120  break;
121  }
122 
124  {
125  RangeServer::Client rsc(m_context->comm);
126  CommAddress addr;
127  MetaLog::EntityPtr bpa_entity;
128 
129  if (!m_plan->moves.empty()) {
130  uint32_t wait_millis = m_plan->duration_millis / m_plan->moves.size();
131 
132  for (auto &move : m_plan->moves) {
133  addr.set_proxy(move->source_location);
134  try {
135  rsc.relinquish_range(addr, move->table, move->range);
136  this_thread::sleep_for(chrono::milliseconds(wait_millis));
137  }
138  catch (Exception &e) {
139  move->complete = true;
140  move->error = e.code();
141  if (!bpa_entity)
142  m_context->get_balance_plan_authority(bpa_entity);
143  static_pointer_cast<BalancePlanAuthority>(bpa_entity)->balance_move_complete(move->table, move->range);
144  }
145  }
146  std::stringstream sout;
147  for (auto &move : m_plan->moves) {
148  sout.str("");
149  sout << *move;
150  HT_INFOF("%s", sout.str().c_str());
151  }
152  }
153  complete_ok(bpa_entity);
154  }
155  break;
156 
157  default:
158  HT_FATALF("Unrecognized state %d", state);
159  }
160 
161  HT_INFOF("Leaving Balance-%lld algorithm=%s", (Lld)header.id,
162  m_plan->algorithm.c_str());
163 }
164 
165 void OperationBalance::display_state(std::ostream &os) {
166  os << *(m_plan.get());
167 }
168 
170  return "OperationBalance";
171 }
172 
174  if (m_plan)
175  return format("Balance %s (%u moves)", m_plan->algorithm.c_str(),
176  (unsigned)m_plan->moves.size());
177  return name();
178 }
179 
180 
182  return 1;
183 }
184 
186  return m_params.encoded_length() + m_plan->encoded_length();
187 }
188 
189 void OperationBalance::encode_state(uint8_t **bufp) const {
190  m_params.encode(bufp);
191  m_plan->encode(bufp);
192 }
193 
194 void OperationBalance::decode_state(uint8_t version,
195  const uint8_t **bufp,
196  size_t *remainp) {
197  m_params.decode(bufp, remainp);
198  m_plan = make_shared<BalancePlan>();
199  m_plan->decode(bufp, remainp);
200 }
201 
203  const uint8_t **bufp,
204  size_t *remainp) {
205  m_plan = make_shared<BalancePlan>();
206  legacy_decode(bufp, remainp, m_plan.get());
207 }
const String label() override
Human readable label for operation.
Retrieves system information (hardware, installation directory, etc)
void display_state(std::ostream &os) override
Write human readable operation state to output stream.
The FailureInducer simulates errors.
ContextPtr m_context
Pointer to Master context.
Definition: Operation.h:553
int64_t md5_hash(const char *input)
Returns a 64-bit hash checksum of a null terminated input buffer.
Definition: md5.cc:388
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
const char * SERVERS
Definition: Operation.cc:46
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
int64_t id
Unique ID of entity.
Declarations for OperationProcessor.
const String name() override
Name of operation used for exclusivity.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
void encode_state(uint8_t **bufp) const override
Encode operation state.
EntityHeader header
Entity header
BalancePlanPtr m_plan
Balance plan.
const char * get_text(int32_t state)
Definition: Operation.cc:609
const char * RECOVER_SERVER
Definition: Operation.cc:51
int64_t m_hash_code
Hash code uniqely identifying operation.
Definition: Operation.h:589
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
Hyperspace definitions
void execute() override
Executes (carries out) the operation.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
void set_state(int32_t state)
Definition: Operation.h:473
const char * INIT
Definition: Operation.cc:45
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
Definition: CommAddress.h:76
Compatibility Macros for C/C++.
void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Decode operation state.
#define HT_END
Definition: Logger.h:220
Functions to serialize/deserialize primitives to/from a memory buffer.
#define HT_ERROR_OUT
Definition: Logger.h:301
const char * METADATA
Definition: Operation.cc:48
size_t encoded_length_state() const override
Encoded length of operation state.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
void relinquish_range(const CommAddress &addr, const TableIdentifier &table, const RangeSpec &range)
Issues a "relinquish range" request synchronously.
Definition: Client.cc:669
OperationBalance(ContextPtr &context)
BalancePlan & plan()
Gets balance plan.
Definition: Balance.h:63
Hypertable definitions
Lib::Master::Request::Parameters::Balance m_params
Request parmaeters.
#define HT_FATALF(msg,...)
Definition: Logger.h:343
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Declarations for general-purpose utility functions.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
Central authority for balance plans.
void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp) override
DependencySet m_dependencies
Set of dependencies.
Definition: Operation.h:595
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Client interface to RangeServer.
Definition: Client.h:63
const char * SYSTEM
Definition: Operation.cc:49
Abstract base class for master operations.
Definition: Operation.h:124
uint8_t encoding_version_state() const override
Returns version of encoding format of state.
const char * ROOT
Definition: Operation.cc:47
This is a generic exception class for Hypertable.
Definition: Error.h:314
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Definition: Operation.cc:436
Declarations for BalancePlanAuthority.
Declarations for ResponseCallback.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
void complete_error(int error, const String &msg, std::vector< MetaLog::EntityPtr > &additional)
Completes operation with error.
Definition: Operation.cc:400
md5 digest routines.
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
int code() const
Returns the error code.
Definition: Error.h:391
Executes user-defined functions when leaving the current scope.