0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Operation.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
29 #include <Common/Compat.h>
30 #include "Operation.h"
31 
33 
34 #include <Common/Serialization.h>
35 
36 #include <algorithm>
37 #include <ctime>
38 #include <iterator>
39 #include <sstream>
40 #include <unordered_map>
41 
42 using namespace Hypertable;
43 using namespace std;
44 
45 const char *Dependency::INIT = "INIT";
46 const char *Dependency::SERVERS = "SERVERS";
47 const char *Dependency::ROOT = "ROOT";
48 const char *Dependency::METADATA = "METADATA";
49 const char *Dependency::SYSTEM = "SYSTEM";
50 const char *Dependency::USER = "USER";
51 const char *Dependency::RECOVER_SERVER = "RECOVER_SERVER";
52 const char *Dependency::RECOVERY_BLOCKER= "RECOVERY_BLOCKER";
53 const char *Dependency::RECOVERY = "RECOVERY";
54 
55 namespace Hypertable {
56  namespace OperationState {
57  const char *get_text(int32_t state);
58  }
59 }
60 
61 Operation::Operation(ContextPtr &context, int32_t type)
62  : MetaLog::Entity(type), m_context(context) {
63  int32_t timeout = m_context->props->get_i32("Hypertable.Request.Timeout");
64  m_expiration_time = ClockT::now() + chrono::milliseconds(timeout);
65  m_hash_code = (int64_t)header.id;
66 }
67 
68 Operation::Operation(ContextPtr &context, EventPtr &event, int32_t type)
69  : MetaLog::Entity(type), m_context(context), m_event(event) {
70  m_expiration_time = ClockT::now() + chrono::milliseconds(m_event->header.timeout_ms);
71  m_hash_code = (int64_t)header.id;
72 }
73 
75  : MetaLog::Entity(header_), m_context(context) {
76  m_hash_code = (int64_t)header.id;
77 }
78 
79 void Operation::display(std::ostream &os) {
80 
81  os << " state=" << OperationState::get_text(m_state);
82  os << " remove_approvals=" << m_remove_approvals;
83  os << " remove_approval_mask=" << m_remove_approval_mask;
84  if (m_state == OperationState::COMPLETE) {
85  os << " [" << Error::get_text(m_error) << "] ";
86  if (m_error != Error::OK)
87  os << m_error_msg << " ";
88  }
89  else {
90  bool first = true;
91  display_state(os);
92 
93  os << " exclusivities=";
94  for (auto & str : m_exclusivities) {
95  if (first) {
96  os << "\"" << str << "\"";
97  first = false;
98  }
99  else
100  os << ",\"" << str << "\"";
101  }
102 
103  first = true;
104  os << " dependencies=";
105  for (auto & str : m_dependencies) {
106  if (first) {
107  os << "\"" << str << "\"";
108  first = false;
109  }
110  else
111  os << ",\"" << str << "\"";
112  }
113 
114  first = true;
115  os << " obstructions=";
116  for (auto & str : m_obstructions) {
117  if (first) {
118  os << "\"" << str << "\"";
119  first = false;
120  }
121  else
122  os << ",\"" << str << "\"";
123  }
124 
125  first = true;
126  os << " permanent_obstructions=";
127  for (auto & str : m_obstructions_permanent) {
128  if (first) {
129  os << "\"" << str << "\"";
130  first = false;
131  }
132  else
133  os << ",\"" << str << "\"";
134  }
135  os << " ";
136  }
137 }
138 
140  return 1;
141 }
142 
144  size_t length = 20;
145 
146  if (m_state == OperationState::COMPLETE)
147  length += 8 + encoded_result_length();
148  else {
149  size_t state_length = encoded_length_state();
150  length += 1 + Serialization::encoded_length_vi32(state_length) + state_length;
154  for (auto & str : m_exclusivities)
156  for (auto & str : m_dependencies)
158  for (auto & str : m_obstructions)
160  // permanent obstructions
162  for (auto & str : m_obstructions_permanent)
164  // sub operations
166  for (int64_t id : m_sub_ops)
168  }
169  return length;
170 }
171 
172 void Operation::encode_internal(uint8_t **bufp) const {
173  Serialization::encode_i32(bufp, m_state);
174  auto nanos = chrono::duration_cast<chrono::nanoseconds>(m_expiration_time.time_since_epoch()).count();
175  Serialization::encode_i64(bufp, (int64_t)(nanos / 1000000000LL));
176  Serialization::encode_i32(bufp, (int32_t)(nanos % 1000000000LL));
179  if (m_state == OperationState::COMPLETE) {
181  encode_result(bufp);
182  }
183  else {
186  encode_state(bufp);
188  for (auto & str : m_exclusivities)
191  for (auto & str : m_dependencies)
194  for (auto & str : m_obstructions)
197  for (auto & str : m_obstructions_permanent)
200  for (int64_t id : m_sub_ops)
201  Serialization::encode_vi64(bufp, id);
202  }
203 }
204 
205 void Operation::decode_internal(uint8_t version, const uint8_t **bufp,
206  size_t *remainp) {
207 
208  m_state = Serialization::decode_i32(bufp, remainp);
209  int64_t nanos = Serialization::decode_i64(bufp, remainp) * 1000000000LL;
210  nanos += Serialization::decode_i32(bufp, remainp);
211  m_expiration_time = ClockT::time_point(chrono::duration_cast<ClockT::duration>(chrono::nanoseconds(nanos)));
214  if (m_state == OperationState::COMPLETE) {
215  m_hash_code = Serialization::decode_i64(bufp, remainp);
216  decode_result(bufp, remainp);
217  }
218  else {
219 
220  // Decode operation state
221  uint8_t version = Serialization::decode_i8(bufp, remainp);
222  if (version > encoding_version_state())
223  HT_THROWF(Error::PROTOCOL_ERROR, "Unsupported version %d", (int)version);
224  size_t encoding_length = Serialization::decode_vi32(bufp, remainp);
225  const uint8_t *end = *bufp + encoding_length;
226  size_t tmp_remain = encoding_length;
227  decode_state(version, bufp, &tmp_remain);
228  HT_ASSERT(*bufp <= end);
229  *remainp -= encoding_length;
230  // If encoding is longer than we expect, that means we're decoding a newer
231  // version, so skip the newer portion that we don't know about
232  if (*bufp < end)
233  *bufp = end;
234 
235  String str;
236  size_t length;
237  m_exclusivities.clear();
238  length = Serialization::decode_vi32(bufp, remainp);
239  for (size_t i=0; i<length; i++) {
240  str = Serialization::decode_vstr(bufp, remainp);
241  m_exclusivities.insert(str);
242  }
243 
244  m_dependencies.clear();
245  length = Serialization::decode_vi32(bufp, remainp);
246  for (size_t i=0; i<length; i++) {
247  str = Serialization::decode_vstr(bufp, remainp);
248  m_dependencies.insert(str);
249  }
250 
251  m_obstructions.clear();
252  length = Serialization::decode_vi32(bufp, remainp);
253  for (size_t i=0; i<length; i++) {
254  str = Serialization::decode_vstr(bufp, remainp);
255  m_obstructions.insert(str);
256  }
257 
258  // permanent obstructions
259  m_obstructions_permanent.clear();
260  length = Serialization::decode_vi32(bufp, remainp);
261  for (size_t i=0; i<length; i++) {
262  str = Serialization::decode_vstr(bufp, remainp);
263  m_obstructions_permanent.insert(str);
264  }
265  // sub operations
266  m_sub_ops.clear();
267  length = Serialization::decode_vi32(bufp, remainp);
268  for (size_t i=0; i<length; i++)
269  m_sub_ops.push_back( Serialization::decode_vi64(bufp, remainp) );
270 
271  }
272 
273 }
274 
275 void Operation::decode(const uint8_t **bufp, size_t *remainp,
276  uint16_t definition_version) {
277  if (definition_version < 4)
278  decode_old(bufp, remainp, definition_version);
279  else
280  Entity::decode(bufp, remainp);
281 }
282 
283 void Operation::decode_old(const uint8_t **bufp, size_t *remainp,
284  uint16_t definition_version) {
285  String str;
286  size_t length;
287  uint8_t version {};
288 
289  if (definition_version >= 2)
290  version = (uint8_t)Serialization::decode_i16(bufp, remainp);
291  m_state = Serialization::decode_i32(bufp, remainp);
292  int64_t nanos = Serialization::decode_i64(bufp, remainp) * 1000000000LL;
293  nanos += Serialization::decode_i32(bufp, remainp);
294  m_expiration_time = ClockT::time_point(chrono::duration_cast<ClockT::duration>(chrono::nanoseconds(nanos)));
295  if (m_original_type == 0 || (m_original_type & 0xF0000L) > 0x20000L) {
298  }
299  if (m_state == OperationState::COMPLETE) {
300  m_hash_code = Serialization::decode_i64(bufp, remainp);
301  decode_result(bufp, remainp);
302  }
303  else {
304  decode_state_old(version, bufp, remainp);
305 
306  m_exclusivities.clear();
307  length = Serialization::decode_vi32(bufp, remainp);
308  for (size_t i=0; i<length; i++) {
309  str = Serialization::decode_vstr(bufp, remainp);
310  m_exclusivities.insert(str);
311  }
312 
313  m_dependencies.clear();
314  length = Serialization::decode_vi32(bufp, remainp);
315  for (size_t i=0; i<length; i++) {
316  str = Serialization::decode_vstr(bufp, remainp);
317  m_dependencies.insert(str);
318  }
319 
320  m_obstructions.clear();
321  length = Serialization::decode_vi32(bufp, remainp);
322  for (size_t i=0; i<length; i++) {
323  str = Serialization::decode_vstr(bufp, remainp);
324  m_obstructions.insert(str);
325  }
326 
327  if (definition_version >= 3) {
328  // permanent obstructions
329  m_obstructions_permanent.clear();
330  length = Serialization::decode_vi32(bufp, remainp);
331  for (size_t i=0; i<length; i++) {
332  str = Serialization::decode_vstr(bufp, remainp);
333  m_obstructions_permanent.insert(str);
334  }
335  // sub operations
336  m_sub_ops.clear();
337  length = Serialization::decode_vi32(bufp, remainp);
338  for (size_t i=0; i<length; i++)
339  m_sub_ops.push_back( Serialization::decode_vi64(bufp, remainp) );
340  }
341  }
342 }
343 
345  if (m_error == Error::OK)
346  return 4;
348 }
349 
350 void Operation::encode_result(uint8_t **bufp) const {
352  if (m_error != Error::OK)
354 }
355 
356 void Operation::decode_result(const uint8_t **bufp, size_t *remainp) {
357  m_error = Serialization::decode_i32(bufp, remainp);
358  if (m_error != Error::OK)
359  m_error_msg = Serialization::decode_vstr(bufp, remainp);
360 }
361 
363  lock_guard<mutex> lock(m_mutex);
365 }
366 
367 void Operation::record_state(std::vector<MetaLog::EntityPtr> &additional) {
368  std::vector<MetaLog::EntityPtr> entities;
369  entities.reserve(1 + additional.size() + m_sub_ops.size());
370  // Add this
371  if (removal_approved())
373  entities.push_back(shared_from_this());
374  // Add additional entities
375  for (auto & entity : additional) {
376  OperationPtr op = dynamic_pointer_cast<Operation>(entity);
377  if (op && op->removal_approved())
378  op->mark_for_removal();
379  entities.push_back(entity);
380  }
381  // Add sub operations
382  std::vector<int64_t> new_sub_ops;
383  for (int64_t id : m_sub_ops) {
384  OperationPtr op = m_context->reference_manager->get(id);
385  if (op->removal_approved())
386  op->mark_for_removal();
387  else
388  new_sub_ops.push_back(op->id());
389  entities.push_back(op);
390  }
391  m_context->mml_writer->record_state(entities);
392  for (auto & entity : entities) {
393  OperationPtr op = dynamic_pointer_cast<Operation>(entity);
394  if (op && op->marked_for_removal())
395  m_context->reference_manager->remove(op);
396  }
397  m_sub_ops.swap(new_sub_ops);
398 }
399 
400 void Operation::complete_error(int error, const String &msg,
401  std::vector<MetaLog::EntityPtr> &additional) {
402  {
403  lock_guard<mutex> lock(m_mutex);
404  m_state = OperationState::COMPLETE;
405  m_error = error;
406  m_error_msg = msg;
407  m_dependencies.clear();
408  m_obstructions.clear();
409  m_exclusivities.clear();
410  for (int64_t id : m_sub_ops) {
411  OperationPtr op = m_context->reference_manager->get(id);
412  op->remove_approval_add(op->get_remove_approval_mask());
413  }
414  }
415 
416  std::stringstream sout;
417  sout << "Operation failed (" << *this << ") " << Error::get_text(error) << " - " << msg;
418  HT_INFOF("%s", sout.str().c_str());
419 
420  if (m_ephemeral) {
421  HT_ASSERT(additional.empty());
422  return;
423  }
424 
425  record_state(additional);
426 }
427 
428 void Operation::complete_error(int error, const String &msg, MetaLog::EntityPtr additional) {
429  std::vector<MetaLog::EntityPtr> entities;
430  if (additional)
431  entities.push_back(additional);
432  complete_error(error, msg, entities);
433 }
434 
435 
436 void Operation::complete_ok(std::vector<MetaLog::EntityPtr> &additional) {
437  {
438  lock_guard<mutex> lock(m_mutex);
439  m_state = OperationState::COMPLETE;
440  m_error = Error::OK;
441  m_dependencies.clear();
442  m_obstructions.clear();
443  m_exclusivities.clear();
444  if (m_ephemeral) {
445  HT_ASSERT(additional.empty());
446  return;
447  }
448  }
449  record_state(additional);
450 }
451 
453  std::vector<MetaLog::EntityPtr> entities;
454  if (additional)
455  entities.push_back(additional);
456  complete_ok(entities);
457 }
458 
460  lock_guard<mutex> lock(m_mutex);
461  exclusivities = m_exclusivities;
462 }
463 
465  lock_guard<mutex> lock(m_mutex);
466  dependencies = m_dependencies;
467 }
468 
470  lock_guard<mutex> lock(m_mutex);
471  obstructions = m_obstructions;
472  obstructions.insert(m_obstructions_permanent.begin(), m_obstructions_permanent.end());
473 }
474 
475 void Operation::fetch_sub_operations(std::vector<OperationPtr> &sub_ops) {
476  lock_guard<mutex> lock(m_mutex);
477  for (int64_t id : m_sub_ops)
478  sub_ops.push_back(m_context->reference_manager->get(id));
479 }
480 
481 
483  lock_guard<mutex> lock(m_mutex);
484  m_unblock_on_exit=false;
485 }
486 
487 
489  lock_guard<mutex> lock(m_mutex);
490  if (m_unblock_on_exit)
491  m_blocked = false;
492 }
493 
494 
496  lock_guard<mutex> lock(m_mutex);
497  if (!m_blocked) {
498  m_blocked = true;
499  return true;
500  }
501  return false;
502 }
503 
505  lock_guard<mutex> lock(m_mutex);
506  bool blocked_on_entry = m_blocked;
507  m_unblock_on_exit = true;
508  m_blocked = false;
509  return blocked_on_entry;
510 }
511 
513  vector<MetaLog::EntityPtr> entities;
514 
515  for (int64_t id : m_sub_ops) {
516  OperationPtr op = m_context->reference_manager->get(id);
517  if (op->get_error()) {
518  complete_error(op->get_error(), op->get_error_msg());
519  return false;
520  }
521  op->remove_approval_add(op->get_remove_approval_mask());
522  string dependency_string =
523  format("%s subop %s %lld", name().c_str(), op->name().c_str(),
524  (Lld)op->hash_code());
525  lock_guard<mutex> lock(m_mutex);
526  m_dependencies.erase(dependency_string);
527  entities.push_back(op);
528  }
529 
530  m_sub_ops.clear();
531  record_state(entities);
532 
533  return true;
534 }
535 
537  string dependency_string =
538  format("%s subop %s %lld", name().c_str(), operation->name().c_str(),
539  (Lld)operation->hash_code());
540  operation->add_obstruction_permanent(dependency_string);
541  operation->set_remove_approval_mask(0x01);
542  m_context->reference_manager->add(operation);
543  {
544  lock_guard<mutex> lock(m_mutex);
545  add_dependency(dependency_string);
546  m_sub_ops.push_back(operation->id());
547  }
548 }
549 
550 
551 
552 namespace {
553  struct StateInfo {
554  int32_t code;
555  const char *text;
556  };
557 
558  StateInfo state_info[] = {
560  { Hypertable::OperationState::COMPLETE, "COMPLETE" },
563  { Hypertable::OperationState::ASSIGN_ID, "ASSIGN_ID" },
564  { Hypertable::OperationState::ASSIGN_LOCATION,"ASSIGN_LOCATION" },
565  { Hypertable::OperationState::ASSIGN_METADATA_RANGES, "ASSIGN_METADATA_RANGES" },
566  { Hypertable::OperationState::LOAD_RANGE, "LOAD_RANGE" },
567  { Hypertable::OperationState::LOAD_ROOT_METADATA_RANGE, "LOAD_ROOT_METADATA_RANGE" },
568  { Hypertable::OperationState::LOAD_SECOND_METADATA_RANGE, "LOAD_SECOND_METADATA_RANGE" },
569  { Hypertable::OperationState::WRITE_METADATA, "WRITE_METADATA" },
570  { Hypertable::OperationState::CREATE_RS_METRICS, "CREATE_RS_METRICS" },
571  { Hypertable::OperationState::VALIDATE_SCHEMA, "VALIDATE_SCHEMA" },
572  { Hypertable::OperationState::SCAN_METADATA, "SCAN_METADATA" },
573  { Hypertable::OperationState::ISSUE_REQUESTS, "ISSUE_REQUESTS" },
574  { Hypertable::OperationState::UPDATE_HYPERSPACE, "UPDATE_HYPERSPACE" },
575  { Hypertable::OperationState::ACKNOWLEDGE, "ACKNOWLEDGE" },
576  { Hypertable::OperationState::FINALIZE, "FINALIZE" },
577  { Hypertable::OperationState::CREATE_INDEX, "CREATE_INDEX" },
578  { Hypertable::OperationState::CREATE_QUALIFIER_INDEX, "CREATE_QUALIFIER_INDEX" },
581  { Hypertable::OperationState::PHANTOM_LOAD, "PHANTOM_LOAD" },
582  { Hypertable::OperationState::REPLAY_FRAGMENTS, "REPLAY_FRAGMENTS" },
583  { Hypertable::OperationState::CREATE_INDICES, "CREATE_INDICES" },
584  { Hypertable::OperationState::DROP_INDICES, "DROP_INDICES" },
585  { Hypertable::OperationState::SUSPEND_TABLE_MAINTENANCE, "SUSPEND_TABLE_MAINTENANCE" },
586  { Hypertable::OperationState::RESUME_TABLE_MAINTENANCE, "RESUME_TABLE_MAINTENANCE" },
587  { Hypertable::OperationState::DROP_VALUE_INDEX, "DROP_VALUE_INDEX" },
588  { Hypertable::OperationState::DROP_QUALIFIER_INDEX, "DROP_QUALIFIER_INDEX" },
589  { Hypertable::OperationState::RENAME_VALUE_INDEX, "RENAME VALUE INDEX" },
590  { Hypertable::OperationState::RENAME_QUALIFIER_INDEX, "RENAME QUALIFIER INDEX" },
591  { 0, 0 }
592  };
593 
594  typedef std::unordered_map<int32_t, const char *> TextMap;
595 
596  TextMap &build_text_map() {
597  TextMap *map = new TextMap();
598  for (int i=0; state_info[i].text != 0; i++)
599  (*map)[state_info[i].code] = state_info[i].text;
600  return *map;
601  }
602 
603  TextMap &text_map = build_text_map();
604 
605 } // local namespace
606 
607 namespace Hypertable {
608  namespace OperationState {
609  const char *get_text(int32_t state) {
610  const char *text = text_map[state];
611  if (text == 0)
612  return "STATE NOT REGISTERED";
613  return text;
614  }
615  }
616 }
virtual void encode_result(uint8_t **bufp) const
Encode operation result.
Definition: Operation.cc:350
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
virtual void display_state(std::ostream &os)=0
Write human readable operation state to output stream.
ContextPtr m_context
Pointer to Master context.
Definition: Operation.h:553
uint16_t m_remove_approvals
Remove approvals received.
Definition: Operation.h:562
String m_error_msg
Result error message.
Definition: Operation.h:574
virtual size_t encoded_length_state() const =0
Encoded length of operation state.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
const char * SERVERS
Definition: Operation.cc:46
bool validate_subops()
Handles the results of sub operations.
Definition: Operation.cc:512
int64_t id
Unique ID of entity.
virtual void decode_result(const uint8_t **bufp, size_t *remainp)
Decode operation result.
Definition: Operation.cc:356
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
void decode(const uint8_t **bufp, size_t *remainp, uint16_t definition_version) override
Decode operation.
Definition: Operation.cc:275
Declarations for Operation.
std::mutex m_mutex
Mutex for serializing access to members
virtual uint8_t encoding_version_state() const =0
Returns version of encoding format of state.
virtual void obstructions(DependencySet &obstructions)
Definition: Operation.cc:469
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
uint16_t m_remove_approval_mask
Remove approval mask.
Definition: Operation.h:565
STL namespace.
int32_t m_original_type
Original entity type read from MML (prior to conversion)
Definition: Operation.h:568
EntityHeader header
Entity header
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
const char * get_text(int32_t state)
Definition: Operation.cc:609
const char * RECOVER_SERVER
Definition: Operation.cc:51
void mark_for_removal()
Marks entity for removal.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
Declarations for ReferenceManager.
int64_t m_hash_code
Hash code uniqely identifying operation.
Definition: Operation.h:589
EventPtr m_event
Pointer to client event (if any) that originated the operation.
Definition: Operation.h:556
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
#define HT_ASSERT(_e_)
Definition: Logger.h:396
uint8_t encoding_version() const override
Returns encoding version.
Definition: Operation.cc:139
virtual void dependencies(DependencySet &dependencies)
Definition: Operation.cc:464
uint8_t decode_i8(const uint8_t **bufp, size_t *remainp)
Decode a 8-bit integer (a byte/character)
Definition: Serialization.h:60
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
int encoded_length_vi32(uint32_t val)
Length of a variable length encoded 32-bit integer (up to 5 bytes)
void encode_vi64(uint8_t **bufp, uint64_t val)
Encode a integer (up to 64-bit) in variable length encoding.
static time_point now() noexcept
Definition: fast_clock.cc:37
const char * USER
Definition: Operation.cc:50
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
size_t encoded_length_internal() const override
Returns internal serialized length.
Definition: Operation.cc:143
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
int32_t m_error
Result error code.
Definition: Operation.h:571
void add_dependency(const String &dependency)
Definition: Operation.h:462
const char * INIT
Definition: Operation.cc:45
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
virtual void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp)=0
Compatibility Macros for C/C++.
DependencySet m_obstructions_permanent
Set of permanent obstructions.
Definition: Operation.h:601
void stage_subop(std::shared_ptr< Operation > operation)
Stages a sub operation for execution.
Definition: Operation.cc:536
bool removal_approved()
Checks if all remove approvals have been received.
Definition: Operation.cc:362
void encode_i16(uint8_t **bufp, uint16_t val)
Encode a 16-bit integer in little-endian order.
void encode_internal(uint8_t **bufp) const override
Writes serialized representation of object to a buffer.
Definition: Operation.cc:172
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
Functions to serialize/deserialize primitives to/from a memory buffer.
void record_state()
Records operation state to the MML.
Definition: Operation.h:401
const char * METADATA
Definition: Operation.cc:48
const String name() override=0
Name of operation used for exclusivity.
bool m_blocked
Flag indicating if operation is blocked.
Definition: Operation.h:580
bool m_ephemeral
Indicates if operation is ephemeral and does not get persisted to MML.
Definition: Operation.h:583
void decode_internal(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Reads serialized representation of object from a buffer.
Definition: Operation.cc:205
const char * RECOVERY_BLOCKER
Definition: Operation.cc:52
void lock()
Locks the entity's mutex.
Definition: MetaLogEntity.h:97
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
Hypertable definitions
uint64_t decode_vi64(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 64-bit.
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void encode_vi32(uint8_t **bufp, uint32_t val)
Encode a integer (up to 32-bit) in variable length encoding.
DependencySet m_obstructions
Set of obstructions.
Definition: Operation.h:598
virtual void exclusivities(DependencySet &exclusivities)
Definition: Operation.cc:459
DependencySet m_dependencies
Set of dependencies.
Definition: Operation.h:595
void fetch_sub_operations(std::vector< std::shared_ptr< Operation > > &sub_ops)
Definition: Operation.cc:475
void decode_old(const uint8_t **bufp, size_t *remainp, uint16_t definition_version)
Definition: Operation.cc:283
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
const char * SYSTEM
Definition: Operation.cc:49
Abstract base class for master operations.
Definition: Operation.h:124
const char * RECOVERY
Definition: Operation.cc:53
Operation(ContextPtr &context, int32_t type)
Constructor with operation type specifier.
Definition: Operation.cc:61
void display(std::ostream &os) override
Write human readable string represenation of operation to output stream.
Definition: Operation.cc:79
const char * ROOT
Definition: Operation.cc:47
std::vector< int64_t > m_sub_ops
Vector of sub operations IDs.
Definition: Operation.h:604
virtual void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp)=0
Decode operation state.
bool m_unblock_on_exit
Flag to signal operation to be unblocked on exit (post_run())
Definition: Operation.h:577
virtual void encode_state(uint8_t **bufp) const =0
Encode operation state.
DependencySet m_exclusivities
Set of exclusivities.
Definition: Operation.h:592
std::set< String > DependencySet
Set of dependency string.
Definition: Operation.h:107
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Definition: Operation.cc:436
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
void complete_error(int error, const String &msg, std::vector< MetaLog::EntityPtr > &additional)
Completes operation with error.
Definition: Operation.cc:400
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
virtual size_t encoded_result_length() const
Length of encoded operation result.
Definition: Operation.cc:344
void encode_i8(uint8_t **bufp, uint8_t val)
Encodes a byte into the given buffer.
Definition: Serialization.h:49
int encoded_length_vi64(uint64_t val)
Length of a variable length encoded 64-bit integer (up to 9 bytes)
ClockT::time_point m_expiration_time
Expiration time (used by ResponseManager)
Definition: Operation.h:586