0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationMoveRange.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "OperationMoveRange.h"
25 #include "OperationProcessor.h"
26 #include "Utility.h"
27 #include "BalancePlanAuthority.h"
28 
29 #include <Hypertable/Lib/Key.h>
31 
33 
34 #include <Common/Error.h>
35 #include <Common/FailureInducer.h>
36 #include <Common/ScopeGuard.h>
37 #include <Common/Serialization.h>
38 #include <Common/System.h>
39 #include <Common/md5.h>
40 
41 #include <chrono>
42 #include <string>
43 #include <thread>
44 
45 using namespace Hypertable;
46 using namespace Hyperspace;
47 using namespace std;
48 
50 OperationMoveRange(ContextPtr &context,const string &source, int64_t range_id,
51  const TableIdentifier &table, const RangeSpec &range,
52  const string &transfer_log, int64_t soft_limit,bool is_split)
53  : Operation(context, MetaLog::EntityType::OPERATION_MOVE_RANGE),
54  m_params(source, range_id, table, range, transfer_log, soft_limit, is_split) {
55  m_range_name = format("%s[%s..%s]", table.id, range.start_row,
56  range.end_row);
58  m_hash_code = hash_code(table, range, source, range_id);
60 }
61 
63  const MetaLog::EntityHeader &header_)
64  : Operation(context, header_) {
65 }
66 
68  : Operation(context, event, MetaLog::EntityType::OPERATION_MOVE_RANGE) {
69  const uint8_t *ptr = event->payload;
70  size_t remaining = event->payload_len;
71  m_params.decode(&ptr, &remaining);
72  m_range_name = format("%s[%s..%s]", m_params.table().id,
79 }
80 
84  "OperationMoveRange"));
85 
89  m_params.range_spec(), ""));
90 
93  }
94  else if (!strncmp(m_params.table().id, "0/", 2)) {
97  }
98  else {
102  }
103  m_obstructions.insert(string("OperationMove ") + m_range_name);
104  m_obstructions.insert(string(m_params.table().id) + " move range");
105 }
106 
108  int32_t state = get_state();
109  string old_destination = m_destination;
110  MetaLog::EntityPtr bpa_entity;
111  m_context->get_balance_plan_authority(bpa_entity);
113  = static_pointer_cast<BalancePlanAuthority>(bpa_entity);
114 
115  HT_INFOF("Entering MoveRange-%lld %s (id=%lld) state=%s",
116  (Lld)header.id, m_range_name.c_str(), (Lld)m_params.range_id(),
117  OperationState::get_text(state));
118 
119  switch (state) {
120 
122 
123  if (!bpa->get_balance_destination(m_params.table(), m_params.range_spec(),
124  m_destination)) {
125  m_context->op->unblock(Dependency::SERVERS);
126  return;
127  }
128 
129  // if the destination changed (i.e. because the old destination is
130  // recovered) then update the metalog!
131  HT_INFOF("MoveRange %s: destination is %s (previous: %s)",
132  m_range_name.c_str(), m_destination.c_str(),
133  old_destination.c_str());
135  HT_MAYBE_FAIL("move-range-INITIAL-a");
136  m_context->mml_writer->record_state(shared_from_this());
137  HT_MAYBE_FAIL("move-range-INITIAL-b");
138  return;
139 
141  if (m_event) {
142  try {
144  cb.response_ok();
145  }
146  catch (Exception &e) {
147  HT_WARN_OUT << e << HT_END;
148  }
149  m_event = 0;
150  }
152  HT_MAYBE_FAIL("move-range-STARTED");
153 
155  try {
156  RangeServer::Client rsc(m_context->comm);
157  CommAddress addr;
158  RangeState range_state;
159 
160  addr.set_proxy(m_destination);
161  range_state.soft_limit = m_params.soft_limit();
162  range_state.transfer_log = m_params.transfer_log().c_str();
163  if (m_context->test_mode)
164  HT_WARNF("Skipping %s::load_range() because in TEST MODE",
165  m_destination.c_str());
166  else
168  range_state, !m_params.is_split());
169  }
170  catch (Exception &e) {
172 
173  // If not yet relinquished, wait a couple of seconds and try again
175  HT_INFOF("%s - %s", Error::get_text(e.code()), e.what());
176  this_thread::sleep_for(chrono::milliseconds(2000));
177  return;
178  }
179 
181  HT_WARNF("Aborting MoveRange %s because table no longer exists",
182  m_range_name.c_str());
183  bpa->balance_move_complete(m_params.table(), m_params.range_spec());
184  remove_approval_add(0x03);
185  complete_ok(bpa_entity);
186  OperationPtr operation
187  = dynamic_pointer_cast<Operation>(shared_from_this());
188  m_context->remove_move_operation(operation);
189  return;
190  }
191 
192  // server might be down - go back to the initial state and pick a
193  // new destination
194  HT_WARNF("Problem moving range %s to %s: %s - %s",
195  m_range_name.c_str(), m_destination.c_str(),
196  Error::get_text(e.code()), e.what());
197  this_thread::sleep_for(chrono::milliseconds(5000));
199  return;
200  }
201  }
202  HT_MAYBE_FAIL("move-range-LOAD_RANGE");
204  m_context->mml_writer->record_state(shared_from_this());
205 
207  {
208  RangeServer::Client rsc(m_context->comm);
209  CommAddress addr;
210 
211  addr.set_proxy(m_destination);
212  if (m_context->test_mode) {
213  HT_WARNF("Skipping %s::acknowledge_load() because in TEST MODE",
214  m_destination.c_str());
215  }
216  else {
217  try {
219  vector<QualifiedRangeSpec *> range_vec;
220  map<QualifiedRangeSpec, int> response_map;
221  range_vec.push_back(&qrs);
222  rsc.acknowledge_load(addr, range_vec, response_map);
223  map<QualifiedRangeSpec, int>::iterator it = response_map.begin();
224  if (it->second != Error::OK &&
225  it->second != Error::TABLE_NOT_FOUND &&
227  HT_THROWF(it->second, "Problem acknowledging load range %s to %s",
228  m_range_name.c_str(), m_destination.c_str());
229  }
230  catch (Exception &e) {
231  // Destination might be down - go back to the initial state
232  HT_WARNF("Problem acknowledging load range %s: %s - %s (dest %s)",
233  m_range_name.c_str(), Error::get_text(e.code()),
234  e.what(), m_destination.c_str());
235  this_thread::sleep_for(chrono::milliseconds(5000));
236  // Fetch new destination, if it changed, and then try again
237  if (!bpa->get_balance_destination(m_params.table(),
239  m_destination))
240  m_context->op->unblock(Dependency::SERVERS);
241  return;
242  }
243  }
244  }
245  bpa->balance_move_complete(m_params.table(), m_params.range_spec());
246  {
247  bool remove_ok = remove_approval_add(0x02);
248  complete_ok(bpa_entity);
249  if (remove_ok) {
250  OperationPtr operation
251  = dynamic_pointer_cast<Operation>(shared_from_this());
252  m_context->remove_move_operation(operation);
253  }
254  }
255  break;
256 
258  break;
259 
260  default:
261  HT_FATALF("Unrecognized state %d", state);
262  }
263 
264  HT_INFOF("Leaving MoveRange-%lld %s (id=%lld) -> %s (state=%s)",
265  (Lld)header.id, m_range_name.c_str(), (Lld)m_params.range_id(),
266  m_destination.c_str(), OperationState::get_text(m_state));
267 }
268 
269 void OperationMoveRange::display_state(std::ostream &os) {
270  os << " " << m_params.table() << " " << m_params.range_spec()
271  << " transfer-log='" << m_params.transfer_log()
272  << "' soft-limit=" << m_params.soft_limit() << " is_split="
273  << (m_params.is_split() ? "true" : "false")
274  << "source='" << m_params.source() << "' location='"
275  << m_destination << "' ";
276 }
277 
279  return 1;
280 }
281 
283  return m_params.encoded_length() +
285 }
286 
287 void OperationMoveRange::encode_state(uint8_t **bufp) const {
288  m_params.encode(bufp);
290 }
291 
292 void OperationMoveRange::decode_state(uint8_t version, const uint8_t **bufp,
293  size_t *remainp) {
294  m_params.decode(bufp, remainp);
295  m_destination = Serialization::decode_vstr(bufp, remainp);
297  = format("%s[%s..%s]", m_params.table().id, m_params.range_spec().start_row,
301  string("OperationMoveRange-")+m_params.source());
302 }
303 
304 void OperationMoveRange::decode_state_old(uint8_t version, const uint8_t **bufp,
305  size_t *remainp) {
306  {
307  string source;
309  source = Serialization::decode_vstr(bufp, remainp);
310  TableIdentifier table;
311  legacy_decode(bufp, remainp, &table);
312  RangeSpec range_spec;
313  legacy_decode(bufp, remainp, &range_spec);
314  string transfer_log = Serialization::decode_vstr(bufp, remainp);
315  int64_t soft_limit = Serialization::decode_i64(bufp, remainp);
316  bool is_split = Serialization::decode_bool(bufp, remainp);
317  m_params = Lib::Master::Request::Parameters::MoveRange(source, 0, table, range_spec,
318  transfer_log, soft_limit,
319  is_split);
320  }
321  m_destination = Serialization::decode_vstr(bufp, remainp);
322  m_range_name = format("%s[%s..%s]", m_params.table().id,
326  string("OperationMoveRange-") + m_params.source());
329 }
330 
331 void OperationMoveRange::decode_result(const uint8_t **bufp, size_t *remainp) {
333  Operation::decode_result(bufp, remainp);
334 }
335 
336 const string OperationMoveRange::name() {
337  if (m_state == OperationState::COMPLETE)
338  return "OperationMoveRange";
339  return format("OperationMoveRange %s %s[%s..%s] -> %s",
340  m_params.source().c_str(), m_params.table().id,
342  m_destination.c_str());
343 }
344 
346  if (m_state == OperationState::COMPLETE)
347  return "OperationMoveRange";
348  return format("MoveRange %s %s[%s..%s] -> %s",
349  m_params.source().c_str(), m_params.table().id,
351  m_destination.c_str());
352 }
353 
355  string start_row = m_params.range_spec().start_row;
356  string end_row = m_params.range_spec().end_row;
357 
358  if (start_row.length() > 20)
359  start_row = start_row.substr(0, 10) + ".."
360  + start_row.substr(start_row.length()-10, 10);
361 
362  if (!strcmp(end_row.c_str(), Key::END_ROW_MARKER))
363  end_row = "END_ROW_MARKER";
364  else if (end_row.length() > 20)
365  end_row = end_row.substr(0, 10) + ".."
366  + end_row.substr(end_row.length() - 10, 10);
367 
368  return format("MoveRange %s\\n%s\\n%s", m_params.table().id,
369  start_row.c_str(), end_row.c_str());
370 }
371 
372 
374  const RangeSpec &range,
375  const std::string &source,
376  int64_t range_id) {
377  if (range_id)
378  return Utility::range_hash_code(table, range, string("OperationMoveRange-") +
379  source + ":" + range_id);
380  return Utility::range_hash_code(table, range, string("OperationMoveRange-") +
381  source);
382 }
Retrieves system information (hardware, installation directory, etc)
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Decode operation state.
void display_state(std::ostream &os) override
Write human readable operation state to output stream.
#define HT_WARNF(msg,...)
Definition: Logger.h:290
The FailureInducer simulates errors.
ContextPtr m_context
Pointer to Master context.
Definition: Operation.h:553
static const char * METADATA_ID
Range specification.
Definition: RangeSpec.h:40
void decode_result(const uint8_t **bufp, size_t *remainp) override
Decode operation result.
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
const char * SERVERS
Definition: Operation.cc:46
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
int64_t id
Unique ID of entity.
Declarations for OperationProcessor.
virtual void decode_result(const uint8_t **bufp, size_t *remainp)
Decode operation result.
Definition: Operation.cc:356
void acknowledge_load(const CommAddress &addr, const vector< QualifiedRangeSpec * > &ranges, std::map< QualifiedRangeSpec, int > &response_map)
Issues a synchronous "acknowledge load" request for multiple ranges.
Definition: Client.cc:163
uint64_t soft_limit
Soft split size limit.
Definition: RangeState.h:108
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
int64_t range_hash_code(const TableIdentifier &table, const RangeSpec &range, const String &qualifier)
Returns a hash code for a range with an optional qualifer string.
Definition: Utility.cc:379
STL namespace.
virtual int64_t hash_code() const
Definition: Operation.h:455
size_t encoded_length_state() const override
Encoded length of operation state.
EntityHeader header
Entity header
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
const String graphviz_label() override
Human readable operation label used in graphviz output.
const char * get_text(int32_t state)
Definition: Operation.cc:609
int64_t m_hash_code
Hash code uniqely identifying operation.
Definition: Operation.h:589
EventPtr m_event
Pointer to client event (if any) that originated the operation.
Definition: Operation.h:556
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
Hyperspace definitions
Request parameters for move range operation.
Definition: MoveRange.h:49
String range_hash_string(const TableIdentifier &table, const RangeSpec &range, const String &qualifier)
Returns string representation of hash code for a range with an optional qualifer string.
Definition: Utility.cc:385
Lib::Master::Request::Parameters::MoveRange m_params
Request parmaeters.
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
const char * end_row
Definition: RangeSpec.h:60
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
void set_state(int32_t state)
Definition: Operation.h:473
bool decode_bool(const uint8_t **bufp, size_t *remainp)
Decodes a boolean value from the given buffer.
Definition: Serialization.h:96
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
OperationMoveRange(ContextPtr &context, const String &source, int64_t range_id, const TableIdentifier &table, const RangeSpec &range, const String &transfer_log, int64_t soft_limit, bool is_split)
const char * INIT
Definition: Operation.cc:45
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
Definition: CommAddress.h:76
Compatibility Macros for C/C++.
const char * transfer_log
Full pathname of transfer log.
Definition: RangeState.h:111
TableIdentifier & table()
Gets table identifier.
Definition: MoveRange.h:82
#define HT_END
Definition: Logger.h:220
void set_remove_approval_mask(uint16_t mask)
Sets the remove approvals bit mask.
Definition: Operation.h:345
Functions to serialize/deserialize primitives to/from a memory buffer.
const String label() override
Human readable label for operation.
const String name() override
Name of operation used for exclusivity.
const char * METADATA
Definition: Operation.cc:48
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
RangeSpec & range_spec()
Gets range specification.
Definition: MoveRange.h:86
#define HT_WARN_OUT
Definition: Logger.h:291
void encode_state(uint8_t **bufp) const override
Encode operation state.
This class is used to generate and deliver standard responses back to a client.
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
bool table_exists(ContextPtr &context, const String &name, String &id)
Checks if table exists and returns table ID.
Definition: Utility.cc:100
DependencySet m_obstructions
Set of obstructions.
Definition: Operation.h:598
Declarations for general-purpose utility functions.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
Central authority for balance plans.
const char * start_row
Definition: RangeSpec.h:59
DependencySet m_dependencies
Set of dependencies.
Definition: Operation.h:595
uint8_t encoding_version_state() const override
Returns version of encoding format of state.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Client interface to RangeServer.
Definition: Client.h:63
const char * SYSTEM
Definition: Operation.cc:49
Abstract base class for master operations.
Definition: Operation.h:124
const char * ROOT
Definition: Operation.cc:47
This is a generic exception class for Hypertable.
Definition: Error.h:314
void execute() override
Executes (carries out) the operation.
Qualified (with table identifier) range specification.
DependencySet m_exclusivities
Set of exclusivities.
Definition: Operation.h:592
const string & transfer_log() const
Gets transfer log.
Definition: MoveRange.h:90
Range state.
Definition: RangeState.h:48
void load_range(const CommAddress &addr, const TableIdentifier &table, const RangeSpec &range_spec, const RangeState &range_state, bool needs_compaction)
Issues a synchronous "load range" request.
Definition: Client.cc:125
void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp) override
#define HT_MAYBE_FAIL(_label_)
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Definition: Operation.cc:436
Declarations for BalancePlanAuthority.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
Declarations for ResponseCallback.
std::shared_ptr< BalancePlanAuthority > BalancePlanAuthorityPtr
Smart pointer to BalancePlanAuthority.
int32_t get_original_type()
Definition: Operation.h:480
bool remove_approval_add(uint16_t approval)
Sets remove approval bits.
Definition: Operation.h:367
Error codes, Exception handling, error logging.
int64_t range_id() const
Gets range MetaLog entry identifier.
Definition: MoveRange.h:78
static const char * END_ROW_MARKER
Definition: Key.h:49
md5 digest routines.
String m_range_name
Range name for logging purposes.
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
const string & source() const
Gets name of source RangeServer.
Definition: MoveRange.h:74
int code() const
Returns the error code.
Definition: Error.h:391
Executes user-defined functions when leaving the current scope.
String m_destination
Destination server.