0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableMutatorAsync.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "TableMutatorAsync.h"
25 
27 #include <Hypertable/Lib/Key.h>
30 #include <Hypertable/Lib/Table.h>
34 
35 #include <Common/Config.h>
36 #include <Common/StringExt.h>
37 
38 using namespace Hypertable;
39 using namespace Hypertable::Config;
40 using namespace std;
41 
43  try {
44  throw;
45  }
46  catch (Hypertable::Exception &ex) {
47  // issue 922: update the table if it was dropped and re-created
48  if (ex.code() == Error::TABLE_NOT_FOUND)
49  m_table->refresh();
50  throw;
51  }
52  catch (std::bad_alloc&) {
53  HT_ERRORF("caught bad_alloc here, %s", info.c_str());
54  }
55  catch (std::exception &e) {
56  HT_ERRORF("caught std::exception: %s, %s", e.what(), info.c_str());
57  }
58  catch (...) {
59  HT_ERRORF("caught unknown exception here, %s", info.c_str());
60  }
61 }
62 
64  ApplicationQueueInterfacePtr &app_queue, Table *table,
65  RangeLocatorPtr &range_locator, uint32_t timeout_ms,
66  ResultCallback *cb, uint32_t flags, bool explicit_block_only)
67  : m_comm(comm), m_app_queue(app_queue), m_table(table),
68  m_range_locator(range_locator),
69  m_timeout_ms(timeout_ms), m_cb(cb), m_flags(flags), m_mutex(m_buffer_mutex),
70  m_cond(m_buffer_cond), m_explicit_block_only(explicit_block_only) {
71  initialize(props);
72 }
73 
74 
75 TableMutatorAsync::TableMutatorAsync(std::mutex &mutex, std::condition_variable &cond,
76  PropertiesPtr &props, Comm *comm, ApplicationQueueInterfacePtr &app_queue,
77  Table *table, RangeLocatorPtr &range_locator, uint32_t timeout_ms,
78  ResultCallback *cb, uint32_t flags, bool explicit_block_only,
79  TableMutator *mutator)
80  : m_comm(comm), m_app_queue(app_queue), m_table(table),
81  m_range_locator(range_locator),
82  m_timeout_ms(timeout_ms), m_cb(cb), m_flags(flags), m_mutex(mutex),
83  m_cond(cond), m_mutator(mutator), m_explicit_block_only(explicit_block_only) {
84  initialize(props);
85 }
86 
90 
91  m_max_memory = props->get_i64("Hypertable.Mutator.ScatterBuffer.FlushLimit.Aggregate");
92 
93  uint32_t buffer_id = ++m_next_buffer_id;
94  m_current_buffer = make_shared<TableMutatorAsyncScatterBuffer>(m_comm, m_app_queue,
96  m_table->auto_refresh(), m_timeout_ms, buffer_id);
97 
98  // if there are indices then initialize the index mutators
99  initialize_indices(props);
100 
101  if (m_cb)
102  m_cb->register_mutator(this);
103 }
104 
105 
107  try {
108  // call sync on any unsynced rangeservers and flush current buffer if needed
109  flush();
110 
113  if (m_cb)
114  m_cb->deregister_mutator(this);
115  }
116  catch (Exception &e) {
117  HT_ERROR_OUT << e.what() << HT_END;
118  }
119 
120  // once more make sure that index mutators are deleted
121  if (m_index_mutator)
122  m_index_mutator = 0;
125 }
126 
128 {
130  m_use_index = false;
131  return;
132  }
133 
134  m_use_index = true;
135 
136  m_imc = make_shared<IndexMutatorCallback>(this, m_cb, m_max_memory);
137  m_cb = &(*m_imc);
138 
139  // create new index mutator
140  if (m_table->has_index_table())
142  make_shared<TableMutatorAsync>(props, m_comm, m_app_queue,
143  m_table->get_index_table().get(),
145  m_flags);
146  // create new qualifier index mutator
149  make_shared<TableMutatorAsync>(props, m_comm, m_app_queue,
152  m_flags);
153 }
154 
156  unique_lock<mutex> lock(m_mutex);
157  m_cond.wait(lock, [this](){ return m_outstanding_buffers.empty(); });
158 }
159 
160 void
162  const void *value, uint32_t value_len) {
163  HT_ASSERT(m_use_index == true);
164  HT_ASSERT(cf && (cf->get_value_index() || cf->get_qualifier_index()));
165 
166  // indexed keys get an auto-assigned timestamp to make sure that the
167  // index key and the original key have identical timestamps
168  if (key.flag == FLAG_INSERT && key.timestamp == AUTO_ASSIGN)
169  key.timestamp = get_ts64();
170 
171  // first store the original key in our callback
172  m_imc->buffer_key(key, value, value_len);
173 
174  // if this is a DELETE then return, otherwise update the index
175  if (key.flag != FLAG_INSERT)
176  return;
177 
178  TableMutatorAsync *value_index_mutator = 0;
179  TableMutatorAsync *qualifier_index_mutator = 0;
180 
181  if (cf->get_value_index())
182  value_index_mutator = m_index_mutator.get();
183 
184  if (cf->get_qualifier_index())
185  qualifier_index_mutator = m_qualifier_index_mutator.get();
186 
187  IndexTables::add(key, key.flag, value, value_len,
188  value_index_mutator, qualifier_index_mutator);
189 }
190 
191 void
192 TableMutatorAsync::set(const KeySpec &key, const void *value,
193  uint32_t value_len) {
194  {
195  lock_guard<mutex> lock(m_member_mutex);
196  ColumnFamilySpec *cf = 0;
197 
198  try {
199  key.sanity_check();
200 
201  Key full_key;
202  to_full_key(key, full_key, &cf);
203 
204  // ensures row len
205  if (key.row_len)
206  full_key.row_len = key.row_len;
207  else if (full_key.row)
208  full_key.row_len = strlen(full_key.row);
209 
210  // if there's an index: buffer the key and update the index
211  if (key.flag == FLAG_INSERT && m_use_index &&
212  cf && (cf->get_value_index() || cf->get_qualifier_index())) {
213  update_with_index(full_key, cf, value, value_len);
214  }
215  else {
216  update_without_index(full_key, cf, value, value_len);
217  }
218  }
219  catch (...) {
221  format("row=%s, cf=%s, cq=%s, value_len=%d (%s:%d)",
222  (const char*)key.row,
223  key.column_family,
224  key.column_qualifier ? key.column_qualifier : "-",
225  value_len,
226  __FILE__,
227  __LINE__));
228  throw;
229  }
230  }
231 
232  if (m_imc && m_imc->needs_flush())
233  flush();
234 }
235 
236 void
238 {
239  // the cell.column_family string contains the numeric ID of the
240  // column family
241  int cf_id = (int)strtoul(cell.column_family, 0, 0);
242  if (!cf_id)
243  HT_THROW(Error::BAD_KEY, "Invalid column family");
244 
245  Key k;
246  k.flag = cell.flag;
247  k.column_family_code = cf_id;
248  k.row = cell.row_key;
250  k.row_len = cell.row_key ? strlen(cell.row_key) : 0;
252  cell.column_qualifier ? strlen(cell.column_qualifier) : 0;
253  k.timestamp = cell.timestamp;
254  k.revision = cell.revision;
255 
256  if (k.flag != FLAG_INSERT) {
257  size_t incr_mem = 20 + k.row_len + k.column_qualifier_len;
258  m_current_buffer->set_delete(k, incr_mem);
259  m_memory_used += incr_mem;
260  }
261  else {
262  size_t incr_mem = 20 + k.row_len + k.column_qualifier_len
263  + cell.value_len;
264  m_current_buffer->set(k, 0, cell.value, cell.value_len, incr_mem);
265  m_memory_used += incr_mem;
266  }
267 }
268 
269 void
271  const Cell &cell)
272 {
273  update_without_index(full_key, cf, cell.value, cell.value_len);
274 }
275 
276 void
278  const void *value, size_t value_len)
279 {
280  if (full_key.flag != FLAG_INSERT) {
281  size_t incr_mem = 20 + full_key.row_len + full_key.column_qualifier_len;
282  m_current_buffer->set_delete(full_key, incr_mem);
283  m_memory_used += incr_mem;
284  }
285  else {
286  size_t incr_mem = 20 + full_key.row_len + full_key.column_qualifier_len
287  + value_len;
288  m_current_buffer->set(full_key, cf, value, value_len, incr_mem);
289  m_memory_used += incr_mem;
290  }
291 }
292 
293 void
294 TableMutatorAsync::set_cells(Cells::const_iterator it,
295  Cells::const_iterator end) {
296  {
297  lock_guard<mutex> lock(m_member_mutex);
298  ColumnFamilySpec *cf = 0;
299 
300  try {
301  for (; it != end; ++it) {
302  Key full_key;
303  const Cell &cell = *it;
304  cell.sanity_check();
305 
306  if (!cell.column_family) {
307  if (cell.flag != FLAG_DELETE_ROW)
309  (String)"Column family not specified in non-delete row set "
310  "on row=" + (String)cell.row_key);
311  full_key.row = cell.row_key;
312  full_key.timestamp = cell.timestamp;
313  full_key.revision = cell.revision;
314  full_key.flag = cell.flag;
315  }
316  else {
317  to_full_key(cell, full_key, &cf);
318  }
319 
320  if (cell.row_key)
321  full_key.row_len = strlen(cell.row_key);
322 
323  // if there's an index: buffer the key and update the index
324  if (cell.flag == FLAG_INSERT && m_use_index
325  && cf && (cf->get_value_index() || cf->get_qualifier_index())) {
326  update_with_index(full_key, cf, cell.value, cell.value_len);
327  }
328  else {
329  update_without_index(full_key, cf, cell);
330  }
331  }
332  }
333  catch (...) {
335  format("row=%s, cf=%s, cq=%s, value_len=%d (%s:%d)",
336  it->row_key,
337  it->column_family,
338  it->column_qualifier ? it->column_qualifier : "-",
339  it->value_len,
340  __FILE__,
341  __LINE__));
342  throw;
343  }
344  }
345 
346  if (m_imc && m_imc->needs_flush())
347  flush();
348 }
349 
351  Key full_key;
352 
353  {
354  lock_guard<mutex> lock(m_member_mutex);
355  ColumnFamilySpec *cf = 0;
356 
357  try {
358  key.sanity_check();
359 
360  if (!key.column_family) {
361  full_key.row = (const char *)key.row;
362  full_key.timestamp = key.timestamp;
363  full_key.revision = key.revision;
364  full_key.flag = key.flag;
365  }
366  else {
367  to_full_key(key, full_key, &cf);
368  }
369 
370  // ensures row len
371  if (key.row_len)
372  full_key.row_len = key.row_len;
373  else if (full_key.row)
374  full_key.row_len = strlen(full_key.row);
375 
376  // if there's an index: buffer the key and update the index
377  if (m_use_index && cf && (cf->get_value_index() || cf->get_qualifier_index())) {
378  update_with_index(full_key, cf, 0, 0);
379  }
380  else {
381  update_without_index(full_key, cf, 0, 0);
382  }
383  }
384  catch (...) {
386  format("row=%s, cf=%s, cq=%s (%s:%d)",
387  (const char*)key.row,
388  key.column_family,
389  key.column_qualifier ? key.column_qualifier : "-",
390  __FILE__,
391  __LINE__));
392  throw;
393  }
394  }
395 
396  if (m_imc && m_imc->needs_flush())
397  flush();
398 }
399 
400 void
401 TableMutatorAsync::to_full_key(const void *row, const char *column_family,
402  const void *column_qualifier, int64_t timestamp, int64_t revision,
403  uint8_t flag, Key &full_key, ColumnFamilySpec **pcf) {
404  ColumnFamilySpec *cf;
405 
406  if (flag > FLAG_DELETE_ROW) {
407  if (!column_family)
408  HT_THROW(Error::BAD_KEY, "Column family not specified");
409 
410  cf = m_schema->get_column_family(column_family);
411 
412  if (!cf) {
413  if (m_table->auto_refresh()) {
415  m_current_buffer->refresh_schema(m_table_identifier, m_schema);
416  cf = m_schema->get_column_family(column_family);
417  if (!cf) {
418  HT_THROWF(Error::BAD_KEY, "Bad column family '%s'", column_family);
419  }
420  }
421  else {
422  HT_THROWF(Error::BAD_KEY, "Bad column family '%s'", column_family);
423  }
424  }
425  full_key.column_family_code = (uint8_t)cf->get_id();
426  }
427  else {
428  full_key.column_family_code = 0;
429  cf = 0;
430  }
431 
432  full_key.row = (const char *)row;
433  if (column_qualifier) {
434  full_key.column_qualifier = (const char *)column_qualifier;
435  full_key.column_qualifier_len = strlen((const char *)column_qualifier);
436  }
437  else {
438  full_key.column_qualifier = "";
439  full_key.column_qualifier_len = 0;
440  }
441  full_key.timestamp = timestamp;
442  full_key.revision = revision;
443  full_key.flag = flag;
444 
445  if (pcf)
446  *pcf = cf;
447 }
448 
450  lock_guard<mutex> lock(m_member_mutex);
451  m_cancelled = true;
452 }
453 
455  lock_guard<mutex> lock(m_member_mutex);
456  return m_cancelled;
457 }
458 
460  lock_guard<mutex> lock(m_member_mutex);
462  return true;
463  if (m_use_index)
464  return m_imc->needs_flush();
465  return false;
466 }
467 
468 void TableMutatorAsync::flush(bool sync) {
470 }
471 
473  // if an index is used: make sure that the index is updated
474  // BEFORE the primary table is flushed!
475  if (m_use_index) {
476  if (m_index_mutator) {
477  m_index_mutator->flush();
478  if (mutator)
480  m_index_mutator->wait_for_completion();
481  }
483  m_qualifier_index_mutator->flush();
484  if (mutator)
486  m_qualifier_index_mutator->wait_for_completion();
487  }
488 
489  if (is_cancelled())
490  return;
491 
492  // propagate all index failures to the original callback
493  m_imc->propagate_failures();
494 
495  // now copy all regular cells to this mutator's buffers
496  m_imc->consume_keybuffer(this);
497 
498  // then fall through
499  }
500 
501  if (is_cancelled())
502  return;
503 
504  uint32_t flags;
505 
506  if (sync)
507  flags = m_flags & ~Table::MUTATOR_FLAG_NO_LOG_SYNC;
508  else
510 
511  try {
512  {
513  lock_guard<mutex> lock(m_mutex);
514  lock_guard<mutex> member_lock(m_member_mutex);
515  if (m_current_buffer->memory_used() > 0) {
516  m_current_buffer->send(flags);
517  uint32_t buffer_id = ++m_next_buffer_id;
518  if (m_outstanding_buffers.size() == 0 && m_cb)
521  m_current_buffer = make_shared<TableMutatorAsyncScatterBuffer>(m_comm,
524  buffer_id);
525  m_memory_used = 0;
526  }
527  }
528 
529  // sync any unsynced RS
530  if (sync)
531  do_sync();
532  }
533  HT_RETHROW("flushing")
534 }
535 
536 void TableMutatorAsync::get_unsynced_rangeservers(std::vector<CommAddress> &unsynced) {
537  lock_guard<mutex> lock(m_member_mutex);
538  unsynced.clear();
539  for (const auto &comm_addr : m_unsynced_rangeservers)
540  unsynced.push_back(comm_addr);
541 }
542 
544  TableIdentifierManaged table_identifier;
545 
546  {
547  lock_guard<mutex> lock(m_member_mutex);
548  if (m_unsynced_rangeservers.empty())
549  return;
550  table_identifier = m_table_identifier;
551  }
552 
553  uint32_t retry_count = 0;
554  bool retry_failed;
555 
556  // sync unsynced rangeservers
557  try {
558  TableMutatorSyncDispatchHandler sync_handler(m_comm, table_identifier,
559  m_timeout_ms);
560 
561  {
562  lock_guard<mutex> lock(m_member_mutex);
563  for (auto addr : m_unsynced_rangeservers)
564  sync_handler.add(addr);
565  }
566 
567  if (!sync_handler.wait_for_completion()) {
568  std::vector<TableMutatorSyncDispatchHandler::ErrorResult> errors;
569  do {
570  bool do_refresh = false;
571  retry_count++;
572  sync_handler.get_errors(errors);
573  for (size_t i=0; i<errors.size(); i++) {
574  if (m_table->auto_refresh() &&
575  (errors[i].error == Error::RANGESERVER_GENERATION_MISMATCH ||
576  (!mutated() && errors[i].error == Error::TABLE_NOT_FOUND)))
577  do_refresh = true;
578  else
579  HT_ERRORF("commit log sync error - %s - %s", errors[i].msg.c_str(),
580  Error::get_text(errors[i].error));
581  }
582  if (do_refresh) {
583  lock_guard<mutex> lock(m_member_mutex);
585  m_current_buffer->refresh_schema(m_table_identifier, m_schema);
586  }
587  sync_handler.retry();
588  }
589  while ((retry_failed = (!sync_handler.wait_for_completion())) &&
590  retry_count < ms_max_sync_retries);
594  if (retry_failed) {
595  sync_handler.get_errors(errors);
596  string error_str =
597  format("commit log sync error '%s' '%s' max retry limit=%d hit.",
598  errors[0].msg.c_str(), Error::get_text(errors[0].error),
599  (int)ms_max_sync_retries);
600  HT_THROW(errors[0].error, error_str);
601  }
602  }
603 
604  }
605  catch (Exception &e) {
606  HT_ERROR_OUT << e << HT_END;
607  throw;
608  }
609  catch (...) {
611  format("retry_count=%d (%s:%d)",
612  retry_count,
613  __FILE__,
614  __LINE__));
615  throw;
616  }
617 }
618 
620  lock_guard<mutex> lock(m_mutex);
622  ScatterBufferAsyncMap::iterator it = m_outstanding_buffers.find(id);
623  if (it != m_outstanding_buffers.end())
624  buffer = it->second;
625  return buffer;
626 }
627 
629  for (const auto &comm_addr : unsynced)
630  m_unsynced_rangeservers.insert(comm_addr);
631 }
632 
634  m_outstanding_buffers.erase(buffer->get_id());
635  if (m_outstanding_buffers.size()==0) {
636  m_cond.notify_one();
637  if (m_cb)
639  }
640  m_cond.notify_one();
641 }
642 
643 void TableMutatorAsync::buffer_finish(uint32_t id, int error, bool retry) {
644  bool cancelled = false;
645  bool mutated = false;
646  uint32_t next_id = 0;
649  ScatterBufferAsyncMap::iterator it;
650 
651  {
652  lock_guard<mutex> lock(m_mutex);
653  it = m_outstanding_buffers.find(id);
654  HT_ASSERT(it != m_outstanding_buffers.end());
655 
656  buffer = it->second;
657  {
658  lock_guard<mutex> lock(m_member_mutex);
659  m_failed_mutations.clear();
660  update_unsynced_rangeservers(buffer->get_unsynced_rangeservers());
661  cancelled = m_cancelled;
662  mutated = m_mutated;
663  }
664 
665  if (cancelled) {
666  update_outstanding(buffer);
667  return;
668  }
669 
670  if (error != Error::OK) {
672  (!mutated && error == Error::TABLE_NOT_FOUND)) {
673  lock_guard<mutex> lock(m_member_mutex);
674  // retry possible
676  buffer->refresh_schema(m_table_identifier, m_schema);
677  retry = true;
678  }
679  else {
680  if (retry)
681  buffer->set_retries_to_fail(error);
682  // send error to callback
683  {
684  lock_guard<mutex> lock(m_member_mutex);
685  buffer->get_failed_mutations(m_failed_mutations);
686  if (m_cb != 0)
687  m_cb->update_error(this, error, m_failed_mutations);
688  }
689  update_outstanding(buffer);
690  return;
691  }
692  }
693 
694  next_id = ++m_next_buffer_id;
695  }
696 
697  if (retry) {
698  // create & send redo buffer
699  lock_guard<mutex> lock(m_member_mutex);
700  try {
701  redo = buffer->create_redo_buffer(next_id);
702  }
703  catch (Exception &e) {
704  error = e.code();
705  redo = 0;
706  }
707  }
708 
709  lock_guard<mutex> lock(m_mutex);
710  if (retry) {
711  if (!redo) {
712  {
713  lock_guard<mutex> lock(m_member_mutex);
714  buffer->get_failed_mutations(m_failed_mutations);
715  // send error to callback
716  if (m_cb != 0)
717  m_cb->update_error(this, error, m_failed_mutations);
718  }
719  update_outstanding(buffer);
720  }
721  else {
722  HT_ASSERT(redo);
723  m_resends += buffer->get_resend_count();
724  m_outstanding_buffers.erase(it);
725  redo->send(buffer->get_send_flags());
726  m_outstanding_buffers[next_id] = redo;
727  }
728  }
729  else {
730  // everything went well
731  {
732  lock_guard<mutex> lock(m_member_mutex);
733  m_mutated = true;
734  }
735  if (m_cb != 0)
736  m_cb->update_ok(this);
737  update_outstanding(buffer);
738  }
739 }
int64_t timestamp
Definition: KeySpec.h:130
#define HT_RETHROW(_s_)
Definition: Error.h:514
static std::mutex mutex
Definition: Logger.cc:43
void wait_for_flush_completion(TableMutatorAsync *mutator)
int64_t timestamp
Definition: Key.h:134
const char * row
Definition: Key.h:129
void initialize(PropertiesPtr &props)
bool has_qualifier_index_table()
returns true if this table has a qualifier index
Definition: Table.h:215
bool auto_refresh()
Definition: Table.h:178
void initialize_indices(PropertiesPtr &props)
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
ScatterBufferAsyncMap m_outstanding_buffers
TableIdentifierManaged m_table_identifier
const char * column_qualifier
Definition: KeySpec.h:128
void set(const KeySpec &key, const void *value, uint32_t value_len)
Inserts a cell into the table.
bool get_value_index() const
Gets value index flag.
std::vector< String > errors
std::shared_ptr< TableMutatorAsyncScatterBuffer > TableMutatorAsyncScatterBufferPtr
Smart pointer to TableMutatorAsyncScatterBuffer.
virtual void register_mutator(TableMutatorAsync *mutator)
Hook for derived classes which want to keep track of scanners/mutators.
Column family specification.
TableMutatorAsyncPtr m_qualifier_index_mutator
const char * column_qualifier
Definition: Cell.h:68
void to_full_key(const void *row, const char *cf, const void *cq, int64_t ts, int64_t rev, uint8_t flag, Key &full_key, ColumnFamilySpec **pcf=0)
void set_delete(const KeySpec &key)
Deletes an entire row, a column family in a particular row, or a specific cell within a row...
void flush_with_tablequeue(TableMutator *mutator, bool sync=true)
int64_t revision
Definition: KeySpec.h:131
STL namespace.
void get(TableIdentifierManaged &table_identifier, SchemaPtr &schema)
Get a copy of table identifier and schema atomically.
Definition: Table.cc:140
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers.
const void * row
Definition: KeySpec.h:125
void handle_send_exceptions(const String &info)
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
Definition: IndexTables.cc:34
void update_with_index(Key &key, const ColumnFamilySpec *cf, const void *value, uint32_t value_len)
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Wrapper for TableIdentifier providing member storage.
Declarations for TableMutatorSyncDispatchHandler.
uint32_t row_len
Definition: Key.h:131
Represents an open table.
Definition: Table.h:58
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Definition: TableMutator.h:55
IndexMutatorCallbackPtr m_imc
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)=0
Callback method for update errors.
virtual ~TableMutatorAsync()
Destructor for TableMutatorAsync object Make sure buffers are flushed and unsynced rangeservers get s...
virtual void update_ok(TableMutatorAsync *mutator)=0
Callback method for successful update.
bool get_qualifier_index() const
Gets qualifier index flag.
uint64_t revision
Definition: Cell.h:70
void buffer_finish(uint32_t id, int error, bool retry)
This is where buffers call back into when their outstanding operations are complete.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
void update_unsynced_rangeservers(const CommAddressSet &unsynced)
TableMutatorAsyncPtr m_index_mutator
Compatibility Macros for C/C++.
void update_without_index(const Cell &cell)
static const uint32_t ms_max_sync_retries
const char * row_key
Definition: Cell.h:66
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
TableMutatorAsyncScatterBufferPtr m_current_buffer
TableMutatorAsync(PropertiesPtr &props, Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, uint32_t timeout_ms, ResultCallback *cb, uint32_t flags=0, bool explicit_block_only=false)
Constructs the TableMutatorAsync object.
int32_t get_id() const
Gets column ID.
virtual void deregister_mutator(TableMutatorAsync *mutator)
Hook for derived classes which want to keep track of scanners/mutators.
#define HT_ERROR_OUT
Definition: Logger.h:301
void do_sync()
Calls sync on any unsynced rangeservers and waits for completion.
bool has_index_table()
returns true if this table has an index
Definition: Table.h:209
Hypertable definitions
std::set< CommAddress > CommAddressSet
Set of CommAddress objects.
Definition: CommAddress.h:212
void sanity_check() const
Definition: Cell.h:43
This class is a DispatchHandler class that is used for collecting asynchronous commit log sync reques...
Entry point to AsyncComm service.
Definition: Comm.h:61
TableMutatorAsyncScatterBufferPtr get_outstanding_buffer(size_t id)
void get_unsynced_rangeservers(std::vector< CommAddress > &unsynced)
void update_outstanding(TableMutatorAsyncScatterBufferPtr &buffer)
const char * column_family
Definition: Cell.h:67
std::condition_variable & m_cond
TablePtr get_index_table()
Definition: Table.h:235
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
uint32_t column_qualifier_len
Definition: Key.h:132
void sanity_check() const
Definition: KeySpec.h:79
This is a generic exception class for Hypertable.
Definition: Error.h:314
uint32_t value_len
Definition: Cell.h:72
int64_t revision
Definition: Key.h:135
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
uint8_t column_family_code
Definition: Key.h:127
static const int64_t AUTO_ASSIGN
Definition: KeySpec.h:38
uint8_t flag
Definition: Cell.h:73
Configuration settings.
uint8_t flag
Definition: Key.h:125
ApplicationQueueInterfacePtr m_app_queue
Encapsulates decomposed key and value.
Definition: Cell.h:32
const char * column_qualifier
Definition: Key.h:130
Represents an open table.
String extensions and helpers: sets, maps, append operators etc.
const char * column_family
Definition: KeySpec.h:127
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
void refresh()
Refresh schema etc.
Definition: Table.cc:132
const uint8_t * value
Definition: Cell.h:71
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
int code() const
Returns the error code.
Definition: Error.h:391
TablePtr get_qualifier_index_table()
Definition: Table.h:239
int64_t timestamp
Definition: Cell.h:69