0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableMutatorAsyncScatterBuffer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
30 
31 #include <Common/Config.h>
32 #include <Common/Random.h>
33 #include <Common/Timer.h>
34 
36 #include <Hypertable/Lib/Key.h>
37 #include <Hypertable/Lib/KeySpec.h>
38 #include <Hypertable/Lib/Table.h>
41 
42 #include <algorithm>
43 #include <chrono>
44 #include <thread>
45 
46 using namespace Hypertable;
47 using namespace std;
48 
51  const TableIdentifier *table_identifier, SchemaPtr &schema,
52  RangeLocatorPtr &range_locator, bool auto_refresh, uint32_t timeout_ms, uint32_t id)
53  : m_comm(comm), m_app_queue(app_queue), m_mutator(mutator), m_schema(schema),
54  m_range_locator(range_locator),
55  m_location_cache(range_locator->location_cache()),
56  m_range_server(comm, timeout_ms), m_table_identifier(*table_identifier),
57  m_auto_refresh(auto_refresh), m_timeout_ms(timeout_ms),
58  m_counter_value(9), m_timer(timeout_ms), m_id(id),
59  m_wait_time(ms_init_redo_wait_time) {
60 
62 
64  "Hypertable.Mutator.ScatterBuffer.FlushLimit.PerServer");
65 }
66 
68  dead = true;
69 }
70 
71 
72 void
73 TableMutatorAsyncScatterBuffer::set(const Key &key, const ColumnFamilySpec *cf, const void *value,
74  uint32_t value_len, size_t incr_mem) {
75  RangeAddrInfo range_info;
76  TableMutatorAsyncSendBufferMap::const_iterator iter;
77  bool counter_reset = false;
78 
79  if (!m_location_cache->lookup(m_table_identifier.id, key.row, &range_info)) {
80  Timer timer(m_timeout_ms, true);
81  RangeLocationInfo range_loc_info;
82  m_range_locator->find_loop(&m_table_identifier, key.row, &range_loc_info,
83  timer, false);
84  range_info = range_loc_info;
85  }
86 
87  {
88  lock_guard<mutex> lock(m_mutex);
89 
90  bool is_counter = false;
91  if (key.column_family_code) {
92  if (!cf)
93  cf = m_schema->get_column_family(key.column_family_code);
94  is_counter = cf->get_option_counter();
95  }
96 
97  // counter? make sure that a valid integer was specified and re-encode
98  // it as a 64bit value
99  if (is_counter) {
100  const char *ascii_value = (const char *)value;
101  char *endptr;
103  m_counter_value.ensure(value_len+1);
104  if (value_len > 0 && (*ascii_value == '=' || *ascii_value == '+')) {
105  counter_reset = (*ascii_value == '=');
106  m_counter_value.add_unchecked(ascii_value+1, value_len-1);
107  }
108  else
109  m_counter_value.add_unchecked(value, value_len);
110  m_counter_value.add_unchecked((const void *)"\0",1);
111  int64_t val = strtoll((const char *)m_counter_value.base, &endptr, 0);
112  if (*endptr)
113  HT_THROWF(Error::BAD_KEY, "Expected integer value, got %s, row=%s",
114  (char*)m_counter_value.base, key.row);
117  }
118 
119  iter = m_buffer_map.find(range_info.addr);
120 
121  if (iter == m_buffer_map.end()) {
122  iter = m_buffer_map.insert(std::make_pair(range_info.addr, make_shared<TableMutatorAsyncSendBuffer>(&m_table_identifier,
123  &m_completion_counter, m_range_locator.get()))).first;
124  (*iter).second->addr = range_info.addr;
125  }
126 
127  (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
128  create_key_and_append((*iter).second->accum, key);
129 
130  // now append the counter
131  if (is_counter) {
132  if (counter_reset) {
133  *m_counter_value.ptr++ = '=';
134  append_as_byte_string((*iter).second->accum, m_counter_value.base, 9);
135  }
136  else
137  append_as_byte_string((*iter).second->accum, m_counter_value.base, 8);
138  }
139  else
140  append_as_byte_string((*iter).second->accum, value, value_len);
141 
142  if ((*iter).second->accum.fill() > m_server_flush_limit)
143  m_full = true;
144  m_memory_used += incr_mem;
145  }
146 }
147 
148 
149 void TableMutatorAsyncScatterBuffer::set_delete(const Key &key, size_t incr_mem) {
150  lock_guard<mutex> lock(m_mutex);
151 
152  RangeAddrInfo range_info;
153  TableMutatorAsyncSendBufferMap::const_iterator iter;
154 
155  if (key.flag == FLAG_INSERT)
156  HT_THROW(Error::BAD_KEY, "Key flag is FLAG_INSERT, expected delete");
157 
158  if (!m_location_cache->lookup(m_table_identifier.id, key.row, &range_info)) {
159  Timer timer(m_timeout_ms, true);
160  RangeLocationInfo range_loc_info;
161  m_range_locator->find_loop(&m_table_identifier, key.row, &range_loc_info,
162  timer, false);
163  range_info = range_loc_info;
164  }
165  iter = m_buffer_map.find(range_info.addr);
166 
167  if (iter == m_buffer_map.end()) {
168  iter = m_buffer_map.insert(std::make_pair(range_info.addr, make_shared<TableMutatorAsyncSendBuffer>(&m_table_identifier,
169  &m_completion_counter, m_range_locator.get()))).first;
170  (*iter).second->addr = range_info.addr;
171  }
172 
173  (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
174  if (key.flag == FLAG_DELETE_COLUMN_FAMILY ||
176  if (key.column_family_code == 0)
177  HT_THROWF(Error::BAD_KEY, "key.flag set to %d but column family=0", key.flag);
178  if (key.flag == FLAG_DELETE_CELL || key.flag == FLAG_DELETE_CELL_VERSION) {
179  if (key.flag == FLAG_DELETE_CELL_VERSION && key.timestamp == AUTO_ASSIGN) {
180  HT_THROWF(Error::BAD_KEY, "key.flag set to %d but timestamp == AUTO_ASSIGN", key.flag);
181  }
182  }
183  }
184 
185  create_key_and_append((*iter).second->accum, key);
186  append_as_byte_string((*iter).second->accum, 0, 0);
187  if ((*iter).second->accum.fill() > m_server_flush_limit)
188  m_full = true;
189  m_memory_used += incr_mem;
190 }
191 
192 
193 void
195  lock_guard<mutex> lock(m_mutex);
196 
197  RangeAddrInfo range_info;
198  TableMutatorAsyncSendBufferMap::const_iterator iter;
199  const uint8_t *ptr = key.ptr;
200  size_t len = Serialization::decode_vi32(&ptr);
201 
202  if (!m_location_cache->lookup(m_table_identifier.id, (const char *)ptr+1,
203  &range_info)) {
204  Timer timer(m_timeout_ms, true);
205  RangeLocationInfo range_loc_info;
206  m_range_locator->find_loop(&m_table_identifier, (const char *)ptr+1,
207  &range_loc_info, timer, false);
208  range_info = range_loc_info;
209  }
210 
211  iter = m_buffer_map.find(range_info.addr);
212 
213  if (iter == m_buffer_map.end()) {
214  iter = m_buffer_map.insert(std::make_pair(range_info.addr, make_shared<TableMutatorAsyncSendBuffer>(&m_table_identifier,
215  &m_completion_counter, m_range_locator.get()))).first;
216  (*iter).second->addr = range_info.addr;
217  }
218 
219  (*iter).second->key_offsets.push_back((*iter).second->accum.fill());
220  (*iter).second->accum.add(key.ptr, (ptr-key.ptr)+len);
221  (*iter).second->accum.add(value.ptr, value.length());
222 
223  if ((*iter).second->accum.fill() > m_server_flush_limit)
224  m_full = true;
225  m_memory_used += incr_mem;
226 }
227 
228 
229 namespace {
230 
231  struct SendRec {
232  SerializedKey key;
233  const char* row;
234  };
235 
236  inline bool operator<(const SendRec &sr1, const SendRec &sr2) {
237  return strcmp(sr1.row, sr2.row) < 0;
238  }
239 }
240 
241 
243  lock_guard<mutex> lock(m_mutex);
244  bool outstanding=false;
245 
246  m_timer.start();
247  TableMutatorAsyncSendBufferPtr send_buffer;
248  std::vector<SendRec> send_vec;
249  uint8_t *ptr;
250  SerializedKey key;
251  SendRec send_rec;
252  size_t len;
253  string range_location;
254 
257 
258  for (TableMutatorAsyncSendBufferMap::const_iterator iter = m_buffer_map.begin();
259  iter != m_buffer_map.end(); ++iter) {
260  send_buffer = (*iter).second;
261 
262  if ((len = send_buffer->accum.fill()) == 0) {
264  continue;
265  }
266 
267  send_buffer->pending_updates.set(new uint8_t [len], len);
268 
269  if (send_buffer->resend()) {
270  memcpy(send_buffer->pending_updates.base,
271  send_buffer->accum.base, len);
272  send_buffer->send_count = send_buffer->retry_count;
273  }
274  else {
275  send_vec.clear();
276  send_vec.reserve(send_buffer->key_offsets.size());
277  for (auto it = send_buffer->key_offsets.begin(); it != send_buffer->key_offsets.end(); ++it) {
278  send_rec.key.ptr = send_buffer->accum.base + *it;
279  send_rec.row = send_rec.key.row();
280  send_vec.push_back(send_rec);
281  }
282  std::stable_sort(send_vec.begin(), send_vec.end());
283 
284  ptr = send_buffer->pending_updates.base;
285 
286  for (auto it = send_vec.begin(); it != send_vec.end(); ++it) {
287  key = it->key;
288  key.next(); // skip key
289  key.next(); // skip value
290  memcpy(ptr, it->key.ptr, key.ptr - it->key.ptr);
291  ptr += key.ptr - it->key.ptr;
292  }
293  HT_ASSERT((size_t)(ptr-send_buffer->pending_updates.base)==len);
294  send_buffer->dispatch_handler =
295  make_shared<TableMutatorAsyncDispatchHandler>(m_app_queue, m_mutator,
296  m_id, send_buffer.get(),
298  send_buffer->send_count = send_buffer->key_offsets.size();
299  }
300 
301  // clear and re-use the allocated memory
302  send_buffer->accum.clear();
303  send_buffer->key_offsets.clear();
304 
308  try {
309  m_send_flags = flags;
310  send_buffer->pending_updates.own = false;
311  m_range_server.update(send_buffer->addr, ClusterId::get(),
312  m_table_identifier, send_buffer->send_count,
313  send_buffer->pending_updates, flags,
314  send_buffer->dispatch_handler.get());
315 
316  outstanding = true;
317 
319  m_unsynced_rangeservers.insert(send_buffer->addr);
320 
321  }
322  catch (Exception &e) {
323  if (e.code() == Error::COMM_NOT_CONNECTED ||
326  m_range_locator->invalidate_host(send_buffer->addr.proxy);
327  send_buffer->add_retries(send_buffer->send_count, 0,
328  send_buffer->pending_updates.size);
329  if (e.code() == Error::COMM_NOT_CONNECTED ||
332  else
333  outstanding = true;
334  // Random wait between 0 and 5 seconds
335  this_thread::sleep_for(Random::duration_millis(5000));
336  }
337  else {
338  HT_FATALF("Problem sending updates to %s - %s (%s)",
339  send_buffer->addr.to_str().c_str(), Error::get_text(e.code()),
340  e.what());
341  }
342  }
343  send_buffer->pending_updates.own = true;
344  }
345 
346  if (outstanding)
347  m_outstanding = true;
348  else
349  m_app_queue->add_unlocked(new TableMutatorAsyncHandler(m_mutator, m_id));
350 }
351 
352 
354  unique_lock<mutex> lock(m_mutex);
355  m_cond.wait(lock, [this](){ return m_outstanding == 0; });
356 }
357 
358 
361  TableMutatorAsyncSendBufferPtr send_buffer;
363  SerializedKey key;
364  ByteString value, bs;
365  size_t incr_mem;
366 
367  try {
368  if (m_timer.remaining() < m_wait_time) {
371  format("Timer remaining=%lld wait_time=%lld",
373  }
374 
375  m_timer.start();
376  this_thread::sleep_for(chrono::milliseconds(m_wait_time));
377  m_timer.stop();
378  redo_buffer = make_shared<TableMutatorAsyncScatterBuffer>(m_comm, m_app_queue, m_mutator,
380  redo_buffer->m_timer = m_timer;
381  redo_buffer->m_wait_time = m_wait_time + 2000;
382 
383  for (TableMutatorAsyncSendBufferMap::const_iterator iter = m_buffer_map.begin();
384  iter != m_buffer_map.end(); ++iter) {
385  send_buffer = (*iter).second;
386 
387  if (send_buffer->accum.fill()) {
388  const uint8_t *endptr;
389 
390  bs.ptr = send_buffer->accum.base;
391  endptr = bs.ptr + send_buffer->accum.fill();
392 
393  // now add all of the old keys to the redo buffer
394  while (bs.ptr < endptr) {
395  key.ptr = bs.next();
396  value.ptr = bs.next();
397  incr_mem = key.length() + value.length();
398  redo_buffer->set(key, value, incr_mem);
399  m_resends++;
400  }
401  }
402  }
403  }
404  catch (Exception &e) {
406  HT_THROW(e.code(), e.what());
407  }
408  return redo_buffer;
409 }
410 
412  TableMutatorAsyncSendBufferPtr send_buffer;
413  Key key;
414  Cell cell;
415  ByteString value, bs;
416  ColumnFamilySpec *cf_spec;
417 
418  for (TableMutatorAsyncSendBufferMap::const_iterator iter = m_buffer_map.begin();
419  iter != m_buffer_map.end(); ++iter) {
420  send_buffer = (*iter).second;
421  if (send_buffer->accum.fill()) {
422  const uint8_t *endptr;
423 
424  bs.ptr = send_buffer->accum.base;
425  endptr = bs.ptr + send_buffer->accum.fill();
426 
427  while (bs.ptr < endptr) {
428  key.load((SerializedKey)bs);
429  cell.row_key = key.row;
430  cell.flag = key.flag;
431  if (cell.flag == FLAG_DELETE_ROW) {
432  cell.column_family = 0;
433  }
434  else {
435  cf_spec = m_schema->get_column_family(key.column_family_code);
436  HT_ASSERT(cf_spec);
437  cell.column_family = m_constant_strings.get(cf_spec->get_name().c_str());
438  }
440  cell.timestamp = key.timestamp;
441  bs.next();
442  cell.value_len = bs.decode_length(&cell.value);
443  bs.next();
444  m_failed_mutations.push_back(std::make_pair(cell, error));
445  }
446  }
447  }
448 }
449 
451  TableMutatorAsyncSendBufferPtr send_buffer;
452  std::vector<FailedRegionAsync> failed_regions;
453  int error = Error::OK;
454 
455  for (TableMutatorAsyncSendBufferMap::const_iterator it = m_buffer_map.begin();
456  it != m_buffer_map.end(); ++it) {
457  (*it).second->get_failed_regions(failed_regions);
458  (*it).second->failed_regions.clear();
459  }
460 
461  if (!failed_regions.empty()) {
462  Cell cell;
463  Key key;
464  ByteString bs;
465  const uint8_t *endptr;
466  ColumnFamilySpec *cf_spec;
467  for (size_t i=0; i<failed_regions.size(); i++) {
468  bs.ptr = failed_regions[i].base;
469  endptr = bs.ptr + failed_regions[i].len;
470  while (bs.ptr < endptr) {
471  key.load((SerializedKey)bs);
472  cell.row_key = key.row;
473  cell.flag = key.flag;
474  if (cell.flag == FLAG_DELETE_ROW) {
475  cell.column_family = 0;
476  }
477  else {
478  cf_spec = m_schema->get_column_family(key.column_family_code);
479  HT_ASSERT(cf_spec);
480  cell.column_family = m_constant_strings.get(cf_spec->get_name().c_str());
481  }
483  cell.timestamp = key.timestamp;
484  bs.next();
485  cell.value_len = bs.decode_length(&cell.value);
486  bs.next();
487  m_failed_mutations.push_back(std::make_pair(cell,
488  failed_regions[i].error));
489  }
490  }
491  }
492  if (m_failed_mutations.size() > 0)
493  error = failed_regions[0].error;
494  return error;
495 }
496 
498 
499  int error = Error::OK;
500  bool has_retries=false;
501 
503  error = set_failed_mutations();
504  // this prevents this mutation failure logic from being executed twice
505  // if this method gets called again
507  }
508 
510  has_retries = true;
511  }
512 
513  m_mutator->buffer_finish(m_id, error, has_retries);
514 
515  {
516  lock_guard<mutex> lock(m_mutex);
517  m_outstanding = false;
518  m_cond.notify_all();
519  }
520 }
TableMutatorAsyncScatterBuffer(Comm *comm, ApplicationQueueInterfacePtr &app_queue, TableMutatorAsync *mutator, const TableIdentifier *, SchemaPtr &, RangeLocatorPtr &, bool auto_refresh, uint32_t timeout_ms, uint32_t id)
int64_t timestamp
Definition: Key.h:134
const char * row
Definition: Key.h:129
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
bool get_option_counter() const
Gets the counter option.
void set(const Key &, const ColumnFamilySpec *cf, const void *value, uint32_t value_len, size_t incr_mem)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void stop()
Stops the timer.
Definition: Timer.h:77
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
void update(const CommAddress &addr, uint64_t cluster_id, const TableIdentifier &table, int32_t count, StaticBuffer &buffer, int32_t flags, DispatchHandler *handler)
Issues an "update" request asynchronously.
Definition: Client.cc:202
std::shared_ptr< TableMutatorAsyncScatterBuffer > TableMutatorAsyncScatterBufferPtr
Smart pointer to TableMutatorAsyncScatterBuffer.
Column family specification.
Holds range start and end row plus location.
const char * column_qualifier
Definition: Cell.h:68
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
STL namespace.
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
Holds range location.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
void append_as_byte_string(DynamicBuffer &dst_buf, const void *value, uint32_t value_len)
Serializes and appends a byte array to a DynamicBuffer object.
Definition: ByteString.h:130
std::shared_ptr< TableMutatorAsyncSendBuffer > TableMutatorAsyncSendBufferPtr
Smart pointer to TableMutatorAsyncSendBuffer.
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void buffer_finish(uint32_t id, int error, bool retry)
This is where buffers call back into when their outstanding operations are complete.
A timer class to keep timeout states across AsyncComm related calls.
static uint64_t get()
Gets the cluster ID.
Definition: ClusterId.h:85
Compatibility Macros for C/C++.
const char * row_key
Definition: Cell.h:66
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
TableMutatorAsyncScatterBufferPtr create_redo_buffer(uint32_t id)
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
bool operator<(const directory_entry< _Key, _Tp > &lhs, const directory_entry< _Key, _Tp > &rhs)
Definition: directory.h:128
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void create_key_and_append(DynamicBuffer &dst_buf, const Key &key, bool time_order_asc)
Definition: Key.cc:105
Entry point to AsyncComm service.
Definition: Comm.h:61
void clear()
Clears the buffer.
const char * get(const char *str)
Returns a copy of the string; this string is valid till the FlyweightString set is destructed...
void start()
Starts the timer.
Definition: Timer.h:64
const char * column_family
Definition: Cell.h:67
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
TableMutatorAsyncCompletionCounter m_completion_counter
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
Random number generator for int32, int64, double and ascii arrays.
uint8_t * base
Pointer to the allocated memory buffer.
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
This is a generic exception class for Hypertable.
Definition: Error.h:314
uint32_t value_len
Definition: Cell.h:72
Declarations for TableMutatorAsyncScatterBuffer.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
static const int64_t AUTO_ASSIGN
Definition: KeySpec.h:38
uint8_t flag
Definition: Cell.h:73
Configuration settings.
uint8_t flag
Definition: Key.h:125
Encapsulates decomposed key and value.
Definition: Cell.h:32
const char * column_qualifier
Definition: Key.h:130
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43
static std::chrono::milliseconds duration_millis(uint32_t maximum)
Returns a random millisecond duration.
Definition: Random.cc:77
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
Declarations for ClusterId.
This class is a DispatchHandler.
const uint8_t * value
Definition: Cell.h:71
uint8_t * next()
Retrieves the next serialized String in the buffer.
Definition: ByteString.h:71
int code() const
Returns the error code.
Definition: Error.h:391
int64_t timestamp
Definition: Cell.h:69