0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
UpdatePipeline.cc
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 #include "UpdatePipeline.h"
29 
35 
38 
39 #include <Common/DynamicBuffer.h>
40 #include <Common/FailureInducer.h>
41 #include <Common/Logger.h>
42 #include <Common/Serialization.h>
43 
44 #include <chrono>
45 #include <set>
46 #include <thread>
47 
48 using namespace Hypertable;
49 using namespace Hypertable::RangeServer;
50 using namespace std;
51 
53  TimerHandlerPtr &timer_handler, CommitLogPtr &log,
54  Filesystem::Flags flags) :
55  m_context(context), m_query_cache(query_cache),
56  m_timer_handler(timer_handler), m_log(log), m_flags(flags) {
57  m_update_coalesce_limit = m_context->props->get_i64("Hypertable.RangeServer.UpdateCoalesceLimit");
58  m_maintenance_pause_interval = m_context->props->get_i32("Hypertable.RangeServer.Testing.MaintenanceNeeded.PauseInterval");
59  m_update_delay = m_context->props->get_i32("Hypertable.RangeServer.UpdateDelay", 0);
60  m_max_clock_skew = m_context->props->get_i32("Hypertable.RangeServer.ClockSkew.Max");
61  m_threads.reserve(3);
62  m_threads.push_back( thread(&UpdatePipeline::qualify_and_transform, this) );
63  m_threads.push_back( thread(&UpdatePipeline::commit, this) );
64  m_threads.push_back( thread(&UpdatePipeline::add_and_respond, this) );
65 }
66 
68  lock_guard<mutex> lock(m_qualify_queue_mutex);
69  m_qualify_queue.push_back(uc);
70  m_qualify_queue_cond.notify_all();
71 }
72 
73 
74 
76  m_shutdown = true;
77  m_qualify_queue_cond.notify_all();
78  m_commit_queue_cond.notify_all();
79  m_response_queue_cond.notify_all();
80  for (std::thread &t : m_threads)
81  t.join();
82 }
83 
84 
86  UpdateContext *uc;
87  SerializedKey key;
88  const uint8_t *mod, *mod_end;
89  const char *row;
90  String start_row, end_row;
91  UpdateRecRangeList *rulist;
92  int error = Error::OK;
93  int64_t latest_range_revision;
94  RangeTransferInfo transfer_info;
95  bool transfer_pending;
96  DynamicBuffer *cur_bufp;
97  DynamicBuffer *transfer_bufp;
98  uint32_t go_buf_reset_offset;
99  uint32_t root_buf_reset_offset;
100  CommitLogPtr transfer_log;
101  UpdateRecRange range_update;
102  RangePtr range;
104  condition_variable &cond = m_qualify_queue_cond;
105  std::list<UpdateContext *> &queue = m_qualify_queue;
106 
107  while (true) {
108 
109  {
110  unique_lock<std::mutex> lock(mutex);
111  cond.wait(lock, [this, &queue](){ return !queue.empty() || m_shutdown; });
112  if (m_shutdown)
113  return;
114  uc = queue.front();
115  queue.pop_front();
116  }
117 
118  rulist = 0;
119  transfer_bufp = 0;
120  go_buf_reset_offset = 0;
121  root_buf_reset_offset = 0;
122 
123  // This probably shouldn't happen for group commit, but since
124  // it's only for testing purposes, we'll leave it here
125  if (m_update_delay)
126  this_thread::sleep_for(chrono::milliseconds(m_update_delay));
127 
128  // Global commit log is only available after local recovery
130 
131  // TODO: Sanity check mod data (checksum validation)
132 
133  // hack to workaround xen timestamp issue
134  if (uc->auto_revision < m_last_revision)
136 
137  for (UpdateRecTable *table_update : uc->updates) {
138 
139  HT_DEBUG_OUT <<"Update: "<< table_update->id << HT_END;
140 
141  if (!table_update->id.is_system() && m_context->server_state->readonly()) {
143  continue;
144  }
145 
146  try {
147  if (!m_context->live_map->lookup(table_update->id.id, table_update->table_info)) {
148  table_update->error = Error::TABLE_NOT_FOUND;
149  table_update->error_msg = table_update->id.id;
150  continue;
151  }
152  }
153  catch (Exception &e) {
154  table_update->error = e.code();
155  table_update->error_msg = e.what();
156  continue;
157  }
158 
159  // verify schema
160  if (table_update->table_info->get_schema()->get_generation() !=
161  table_update->id.generation) {
163  table_update->error_msg =
164  format("Update schema generation mismatch for table %s (received %lld != %lld)",
165  table_update->id.id, (Lld)table_update->id.generation,
166  (Lld)table_update->table_info->get_schema()->get_generation());
167  continue;
168  }
169 
170  // Pre-allocate the go_buf - each key could expand by 8 or 9 bytes,
171  // if auto-assigned (8 for the ts or rev and maybe 1 for possible
172  // increase in vint length)
173  table_update->go_buf.reserve(table_update->id.encoded_length() +
174  table_update->total_buffer_size +
175  (table_update->total_count * 9));
176  table_update->id.encode(&table_update->go_buf.ptr);
177  table_update->go_buf.set_mark();
178 
179  for (UpdateRequest *request : table_update->requests) {
180  uc->total_updates++;
181 
182  mod_end = request->buffer.base + request->buffer.size;
183  mod = request->buffer.base;
184 
185  go_buf_reset_offset = table_update->go_buf.fill();
186  root_buf_reset_offset = uc->root_buf.fill();
187 
188  memset(&uc->send_back, 0, sizeof(uc->send_back));
189 
190  while (mod < mod_end) {
191  key.ptr = mod;
192  row = key.row();
193 
194  // error inducer for tests/integration/fail-index-mutator
195  if (HT_FAILURE_SIGNALLED("fail-index-mutator-0")) {
196  if (!strcmp(row, "1,+/JzamFvB6rqPqP5yNgI5nreCtZHkT\t\t01501")) {
197  uc->send_back.count++;
199  uc->send_back.offset = mod - request->buffer.base;
200  uc->send_back.len = strlen(row);
201  request->send_back_vector.push_back(uc->send_back);
202  memset(&uc->send_back, 0, sizeof(uc->send_back));
203  key.next(); // skip key
204  key.next(); // skip value;
205  mod = key.ptr;
206  continue;
207  }
208  }
209 
210  // If the row key starts with '\0' then the buffer is probably
211  // corrupt, so mark the remaing key/value pairs as bad
212  if (*row == 0) {
214  uc->send_back.count = request->count; // fix me !!!!
215  uc->send_back.offset = mod - request->buffer.base;
216  uc->send_back.len = mod_end - mod;
217  request->send_back_vector.push_back(uc->send_back);
218  memset(&uc->send_back, 0, sizeof(uc->send_back));
219  mod = mod_end;
220  continue;
221  }
222 
223  // Look for containing range, add to stop mods if not found
224  if (!table_update->table_info->find_containing_range(row, range,
225  start_row, end_row) ||
226  range->get_relinquish()) {
228  && uc->send_back.count > 0) {
229  uc->send_back.len = (mod - request->buffer.base) - uc->send_back.offset;
230  request->send_back_vector.push_back(uc->send_back);
231  memset(&uc->send_back, 0, sizeof(uc->send_back));
232  }
233  if (uc->send_back.count == 0) {
235  uc->send_back.offset = mod - request->buffer.base;
236  }
237  key.next(); // skip key
238  key.next(); // skip value;
239  mod = key.ptr;
240  uc->send_back.count++;
241  continue;
242  }
243 
244  if ((rulist = table_update->range_map[range.get()]) == 0) {
245  rulist = new UpdateRecRangeList();
246  rulist->range = range;
247  table_update->range_map[range.get()] = rulist;
248  }
249 
250  // See if range has some other error preventing it from receiving updates
251  if ((error = rulist->range->get_error()) != Error::OK) {
252  if (uc->send_back.error != error && uc->send_back.count > 0) {
253  uc->send_back.len = (mod - request->buffer.base) - uc->send_back.offset;
254  request->send_back_vector.push_back(uc->send_back);
255  memset(&uc->send_back, 0, sizeof(uc->send_back));
256  }
257  if (uc->send_back.count == 0) {
258  uc->send_back.error = error;
259  uc->send_back.offset = mod - request->buffer.base;
260  }
261  key.next(); // skip key
262  key.next(); // skip value;
263  mod = key.ptr;
264  uc->send_back.count++;
265  continue;
266  }
267 
268  if (uc->send_back.count > 0) {
269  uc->send_back.len = (mod - request->buffer.base) - uc->send_back.offset;
270  request->send_back_vector.push_back(uc->send_back);
271  memset(&uc->send_back, 0, sizeof(uc->send_back));
272  }
273 
274  /*
275  * Increment update count on range
276  * (block if maintenance in progress)
277  */
278  if (!rulist->range_blocked) {
279  if (!rulist->range->increment_update_counter()) {
281  uc->send_back.offset = mod - request->buffer.base;
282  uc->send_back.count++;
283  key.next(); // skip key
284  key.next(); // skip value;
285  mod = key.ptr;
286  continue;
287  }
288  rulist->range_blocked = true;
289  }
290 
291  String range_start_row, range_end_row;
292  rulist->range->get_boundary_rows(range_start_row, range_end_row);
293 
294  // Make sure range didn't just shrink
295  if (range_start_row != start_row || range_end_row != end_row) {
296  rulist->range->decrement_update_counter();
297  table_update->range_map.erase(rulist->range.get());
298  delete rulist;
299  continue;
300  }
301 
303  {
304  bool wait_for_maintenance;
305  transfer_pending = rulist->range->get_transfer_info(transfer_info, transfer_log,
306  &latest_range_revision, wait_for_maintenance);
307  }
308 
309  if (rulist->transfer_log.get() == 0)
310  rulist->transfer_log = transfer_log;
311 
312  HT_ASSERT(rulist->transfer_log.get() == transfer_log.get());
313 
314  bool in_transferring_region = false;
315 
316  // Check for clock skew
317  {
318  ByteString tmp_key;
319  const uint8_t *tmp;
320  int64_t difference, tmp_timestamp;
321  tmp_key.ptr = key.ptr;
322  tmp_key.decode_length(&tmp);
323  if ((*tmp & Key::HAVE_REVISION) == 0) {
324  if (latest_range_revision > TIMESTAMP_MIN
325  && uc->auto_revision < latest_range_revision) {
326  tmp_timestamp = Hypertable::get_ts64();
327  if (tmp_timestamp > uc->auto_revision)
328  uc->auto_revision = tmp_timestamp;
329  if (uc->auto_revision < latest_range_revision) {
330  difference = (int32_t)((latest_range_revision - uc->auto_revision)
331  / 1000LL);
332  if (difference > m_max_clock_skew && !Global::ignore_clock_skew_errors) {
334  HT_ERRORF("Clock skew of %lld microseconds exceeds maximum "
335  "(%lld) range=%s", (Lld)difference,
337  rulist->range->get_name().c_str());
338  uc->send_back.count = 0;
339  request->send_back_vector.clear();
340  break;
341  }
342  }
343  }
344  }
345  }
346 
347  if (transfer_pending) {
348  transfer_bufp = &rulist->transfer_buf;
349  if (transfer_bufp->empty()) {
350  transfer_bufp->reserve(table_update->id.encoded_length());
351  table_update->id.encode(&transfer_bufp->ptr);
352  transfer_bufp->set_mark();
353  }
354  rulist->transfer_buf_reset_offset = rulist->transfer_buf.fill();
355  }
356  else {
357  transfer_bufp = 0;
358  rulist->transfer_buf_reset_offset = 0;
359  }
360 
361  if (rulist->range->is_root()) {
362  if (uc->root_buf.empty()) {
363  uc->root_buf.reserve(table_update->id.encoded_length());
364  table_update->id.encode(&uc->root_buf.ptr);
365  uc->root_buf.set_mark();
366  root_buf_reset_offset = uc->root_buf.fill();
367  }
368  cur_bufp = &uc->root_buf;
369  }
370  else
371  cur_bufp = &table_update->go_buf;
372 
373  rulist->last_request = request;
374 
375  range_update.bufp = cur_bufp;
376  range_update.offset = cur_bufp->fill();
377 
378  while (mod < mod_end &&
379  (end_row == "" || (strcmp(row, end_row.c_str()) <= 0))) {
380 
381  if (transfer_pending) {
382 
383  if (transfer_info.transferring(row)) {
384  if (!in_transferring_region) {
385  range_update.len = cur_bufp->fill() - range_update.offset;
386  rulist->add_update(request, range_update);
387  cur_bufp = transfer_bufp;
388  range_update.bufp = cur_bufp;
389  range_update.offset = cur_bufp->fill();
390  in_transferring_region = true;
391  }
392  table_update->transfer_count++;
393  }
394  else {
395  if (in_transferring_region) {
396  range_update.len = cur_bufp->fill() - range_update.offset;
397  rulist->add_update(request, range_update);
398  cur_bufp = &table_update->go_buf;
399  range_update.bufp = cur_bufp;
400  range_update.offset = cur_bufp->fill();
401  in_transferring_region = false;
402  }
403  }
404  }
405 
406  try {
407  SchemaPtr schema = table_update->table_info->get_schema();
408  uint8_t family=*(key.ptr+1+strlen((const char *)key.ptr+1)+1);
409  ColumnFamilySpec *cf_spec = schema->get_column_family(family);
410 
411  // reset auto_revision if it's gotten behind
412  if (uc->auto_revision < latest_range_revision) {
414  if (uc->auto_revision < latest_range_revision) {
416  "Auto revision (%lld) is less than latest range "
417  "revision (%lld) for range %s",
418  (Lld)uc->auto_revision, (Lld)latest_range_revision,
419  rulist->range->get_name().c_str());
420  }
421  }
422 
423  // This will transform keys that need to be assigned a
424  // timestamp and/or revision number by re-writing the key
425  // with the added timestamp and/or revision tacked on to the end
426  transform_key(key, cur_bufp, ++uc->auto_revision,&m_last_revision,
427  cf_spec ? cf_spec->get_option_time_order_desc() : false);
428 
429  // Validate revision number
430  if (m_last_revision < latest_range_revision) {
431  if (m_last_revision != uc->auto_revision) {
433  "Supplied revision (%lld) is less than most recently "
434  "seen revision (%lld) for range %s",
435  (Lld)m_last_revision, (Lld)latest_range_revision,
436  rulist->range->get_name().c_str());
437  }
438  }
439  }
440  catch (Exception &e) {
441  HT_ERRORF("%s - %s", e.what(), Error::get_text(e.code()));
442  request->error = e.code();
443  break;
444  }
445 
446  // Now copy the value (with sanity check)
447  mod = key.ptr;
448  key.next(); // skip value
449  HT_ASSERT(key.ptr <= mod_end);
450  cur_bufp->add(mod, key.ptr-mod);
451  mod = key.ptr;
452 
453  table_update->total_added++;
454 
455  if (mod < mod_end)
456  row = key.row();
457  }
458 
459  if (request->error == Error::OK) {
460 
461  range_update.len = cur_bufp->fill() - range_update.offset;
462  rulist->add_update(request, range_update);
463 
464  // if there were transferring updates, record the latest revision
465  if (transfer_pending && rulist->transfer_buf_reset_offset < rulist->transfer_buf.fill()) {
468  }
469  }
470  else {
471  /*
472  * If we drop into here, this means that the request is
473  * being aborted, so reset all of the UpdateRecRangeLists,
474  * reset the go_buf and the root_buf
475  */
476  for (auto iter = table_update->range_map.begin();
477  iter != table_update->range_map.end(); ++iter)
478  (*iter).second->reset_updates(request);
479  table_update->go_buf.ptr = table_update->go_buf.base + go_buf_reset_offset;
480  if (root_buf_reset_offset)
481  uc->root_buf.ptr = uc->root_buf.base + root_buf_reset_offset;
482  uc->send_back.count = 0;
483  mod = mod_end;
484  }
485  range_update.bufp = 0;
486  }
487 
488  transfer_log = 0;
489 
490  if (uc->send_back.count > 0) {
491  uc->send_back.len = (mod - request->buffer.base) - uc->send_back.offset;
492  request->send_back_vector.push_back(uc->send_back);
493  memset(&uc->send_back, 0, sizeof(uc->send_back));
494  }
495  }
496 
497  HT_DEBUGF("Added %d (%d transferring) updates to '%s'",
498  table_update->total_added, table_update->transfer_count,
499  table_update->id.id);
500  if (!table_update->id.is_metadata())
501  uc->total_added += table_update->total_added;
502  }
503 
505 
506  // Enqueue update
507  {
508  lock_guard<std::mutex> lock(m_commit_queue_mutex);
509  m_commit_queue.push_back(uc);
510  m_commit_queue_cond.notify_all();
512  }
513  }
514 }
515 
517  UpdateContext *uc;
518  SerializedKey key;
519  std::list<UpdateContext *> coalesce_queue;
520  uint64_t coalesce_amount = 0;
521  int error = Error::OK;
522  uint32_t committed_transfer_data;
523  bool log_needs_syncing {};
524 
525  while (true) {
526 
527  // Dequeue next update
528  {
529  unique_lock<std::mutex> lock(m_commit_queue_mutex);
530  m_commit_queue_cond.wait(lock, [this](){
531  return !m_commit_queue.empty() || m_shutdown; });
532  if (m_shutdown)
533  return;
534  uc = m_commit_queue.front();
535  m_commit_queue.pop_front();
537  }
538 
539  committed_transfer_data = 0;
540  log_needs_syncing = false;
541 
542  // Commit ROOT mutations
543  if (uc->root_buf.ptr > uc->root_buf.mark) {
545  HT_FATALF("Problem writing %d bytes to ROOT commit log - %s",
546  (int)uc->root_buf.fill(), Error::get_text(error));
547  }
548  }
549 
550  for (UpdateRecTable *table_update : uc->updates) {
551 
552  coalesce_amount += table_update->total_buffer_size;
553 
554  // Iterate through all of the ranges, committing any transferring updates
555  for (auto iter = table_update->range_map.begin(); iter != table_update->range_map.end(); ++iter) {
556  if ((*iter).second->transfer_buf.ptr > (*iter).second->transfer_buf.mark) {
557  committed_transfer_data += (*iter).second->transfer_buf.ptr - (*iter).second->transfer_buf.mark;
558  if ((error = (*iter).second->transfer_log->write(ClusterId::get(), (*iter).second->transfer_buf,
559  (*iter).second->latest_transfer_revision,
560  m_flags)) != Error::OK) {
561  table_update->error = error;
562  table_update->error_msg = format("Problem writing %d bytes to transfer log",
563  (int)(*iter).second->transfer_buf.fill());
564  HT_ERRORF("%s - %s", table_update->error_msg.c_str(), Error::get_text(error));
565  break;
566  }
567  }
568  }
569 
570  if (table_update->error != Error::OK)
571  continue;
572 
573  constexpr uint32_t NO_LOG_SYNC_FLAGS =
576 
577  if ((table_update->flags & NO_LOG_SYNC_FLAGS) == 0)
578  log_needs_syncing = true;
579 
580  // Commit valid (go) mutations
581  if ((table_update->flags & Lib::RangeServer::Protocol::UPDATE_FLAG_NO_LOG) == 0 &&
582  table_update->go_buf.ptr > table_update->go_buf.mark) {
583 
584  if ((error = m_log->write(ClusterId::get(), table_update->go_buf, uc->last_revision, Filesystem::Flags::NONE)) != Error::OK) {
585  table_update->error_msg = format("Problem writing %d bytes to commit log (%s) - %s",
586  (int)table_update->go_buf.fill(),
587  m_log->get_log_dir().c_str(),
588  Error::get_text(error));
589  HT_ERRORF("%s", table_update->error_msg.c_str());
590  table_update->error = error;
591  continue;
592  }
593  }
594 
595  }
596 
597  bool do_sync = false;
598  if (log_needs_syncing) {
599  if (m_commit_queue_count > 0 && coalesce_amount < m_update_coalesce_limit) {
600  coalesce_queue.push_back(uc);
601  continue;
602  }
603  do_sync = true;
604  }
605  else if (!coalesce_queue.empty())
606  do_sync = true;
607 
608  // Now sync the commit log if needed
609  if (do_sync) {
610  size_t retry_count {};
611  uc->total_syncs++;
612 
613  while (true) {
614 
616  error = m_log->flush();
617  else if (m_flags == Filesystem::Flags::SYNC)
618  error = m_log->sync();
619  else
620  error = Error::OK;
621 
622  if (error != Error::OK) {
623  HT_ERRORF("Problem %sing log fragment (%s) - %s",
624  (m_flags == Filesystem::Flags::FLUSH ? "flush" : "sync"),
625  m_log->get_current_fragment_file().c_str(),
626  Error::get_text(error));
627  if (++retry_count == 6)
628  break;
629  this_thread::sleep_for(chrono::milliseconds(10000));
630  }
631  else
632  break;
633  }
634  }
635 
636  // Enqueue update
637  {
638  lock_guard<std::mutex> lock(m_response_queue_mutex);
639  coalesce_queue.push_back(uc);
640  while (!coalesce_queue.empty()) {
641  uc = coalesce_queue.front();
642  coalesce_queue.pop_front();
643  m_response_queue.push_back(uc);
644  }
645  coalesce_amount = 0;
646  m_response_queue_cond.notify_all();
647  }
648  }
649 }
650 
652  UpdateContext *uc;
653  SerializedKey key;
654  int error = Error::OK;
655 
656  while (true) {
657 
658  // Dequeue next update
659  {
660  unique_lock<std::mutex> lock(m_response_queue_mutex);
661  m_response_queue_cond.wait(lock, [this](){
662  return !m_response_queue.empty() || m_shutdown; });
663  if (m_shutdown)
664  return;
665  uc = m_response_queue.front();
666  m_response_queue.pop_front();
667  }
668 
672  for (UpdateRecTable *table_update : uc->updates) {
673 
674  // Iterate through all of the ranges, inserting updates
675  for (auto iter = table_update->range_map.begin(); iter != table_update->range_map.end(); ++iter) {
676  ByteString value;
677  Key key_comps;
678 
679  for (UpdateRecRange &update : (*iter).second->updates) {
680  Range *rangep = (*iter).first;
681  lock_guard<Range> lock(*rangep);
682  uint8_t *ptr = update.bufp->base + update.offset;
683  uint8_t *end = ptr + update.len;
684 
685  if (!table_update->id.is_metadata())
686  uc->total_bytes_added += update.len;
687 
688  rangep->add_bytes_written( update.len );
689  std::set<uint8_t> columns;
690  bool invalidate {};
691  const char *current_row {};
692  uint64_t count = 0;
693  while (ptr < end) {
694  key.ptr = ptr;
695  key_comps.load(key);
696  if (current_row == nullptr)
697  current_row = key_comps.row;
698  count++;
699  ptr += key_comps.length;
700  value.ptr = ptr;
701  ptr += value.length();
702  if (key_comps.column_family_code == 0 && key_comps.flag != FLAG_DELETE_ROW) {
703  HT_ERRORF("Skipping bad key - column family not specified in "
704  "non-delete row update on %s row=%s",
705  table_update->id.id, key_comps.row);
706  continue;
707  }
708  rangep->add(key_comps, value);
709  // invalidate
710  if (m_query_cache) {
711  if (strcmp(current_row, key_comps.row)) {
712  if (invalidate)
713  columns.clear();
714  m_query_cache->invalidate(table_update->id.id, current_row, columns);
715  columns.clear();
716  invalidate = false;
717  current_row = key_comps.row;
718  }
719  if (key_comps.flag == FLAG_DELETE_ROW)
720  invalidate = true;
721  else
722  columns.insert(key_comps.column_family_code);
723  }
724  }
725 
726  if (m_query_cache && current_row) {
727  if (invalidate)
728  columns.clear();
729  m_query_cache->invalidate(table_update->id.id, current_row, columns);
730  }
731 
732  rangep->add_cells_written(count);
733  }
734  }
735  }
736 
737  // Decrement usage counters for all referenced ranges
738  for (UpdateRecTable *table_update : uc->updates) {
739  for (auto iter = table_update->range_map.begin(); iter != table_update->range_map.end(); ++iter) {
740  if ((*iter).second->range_blocked)
741  (*iter).first->decrement_update_counter();
742  }
743  }
744 
748  bool maintenance_needed = false;
749  for (UpdateRecTable *table_update : uc->updates) {
750 
751  /*
752  * If any of the newly updated ranges needs maintenance,
753  * schedule immediately
754  */
755  for (auto iter = table_update->range_map.begin(); iter != table_update->range_map.end(); ++iter) {
756  if ((*iter).first->need_maintenance() &&
757  !Global::maintenance_queue->contains((*iter).first)) {
758  maintenance_needed = true;
759  HT_MAYBE_FAIL_X("metadata-update-and-respond", (*iter).first->is_metadata());
760  if (m_timer_handler)
761  m_timer_handler->schedule_immediate_maintenance();
762  break;
763  }
764  }
765 
766  for (UpdateRequest *request : table_update->requests) {
767  Response::Callback::Update cb(m_context->comm, request->event);
768 
769  if (table_update->error != Error::OK) {
770  if ((error = cb.error(table_update->error, table_update->error_msg)) != Error::OK)
771  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
772  continue;
773  }
774 
775  if (request->error == Error::OK) {
779  if (!request->send_back_vector.empty()) {
780  StaticBuffer ext(new uint8_t [request->send_back_vector.size() * 16],
781  request->send_back_vector.size() * 16);
782  uint8_t *ptr = ext.base;
783  for (size_t i=0; i<request->send_back_vector.size(); i++) {
784  Serialization::encode_i32(&ptr, request->send_back_vector[i].error);
785  Serialization::encode_i32(&ptr, request->send_back_vector[i].count);
786  Serialization::encode_i32(&ptr, request->send_back_vector[i].offset);
787  Serialization::encode_i32(&ptr, request->send_back_vector[i].len);
788  /*
789  HT_INFOF("Sending back error %x, count %d, offset %d, len %d, table id %s",
790  request->send_back_vector[i].error, request->send_back_vector[i].count,
791  request->send_back_vector[i].offset, request->send_back_vector[i].len,
792  table_update->id.id);
793  */
794  }
795  if ((error = cb.response(ext)) != Error::OK)
796  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
797  }
798  else {
799  if ((error = cb.response_ok()) != Error::OK)
800  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
801  }
802  }
803  else {
804  if ((error = cb.error(request->error, "")) != Error::OK)
805  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
806  }
807  }
808 
809  }
810 
811  {
812  lock_guard<LoadStatistics> lock(*Global::load_statistics);
814  }
815 
816  delete uc;
817 
818  // For testing
819  if (m_maintenance_pause_interval > 0 && maintenance_needed)
820  this_thread::sleep_for(chrono::milliseconds(m_maintenance_pause_interval));
821 
822  }
823 }
824 
825 
826 void
828  int64_t auto_revision, int64_t *revisionp,
829  bool timeorder_desc) {
830  size_t len;
831  const uint8_t *ptr;
832 
833  len = bskey.decode_length(&ptr);
834 
836 
837  // if TIME_ORDER DESC was set for this column then we store the timestamps
838  // NOT in 1-complements!
839  if (timeorder_desc) {
840  // if the timestamp was specified by the user: unpack it and pack it
841  // again w/o 1-complement
842  if (*ptr == Key::HAVE_TIMESTAMP) {
843  uint8_t *p=(uint8_t *)ptr+len-8;
844  int64_t ts=Key::decode_ts64((const uint8_t **)&p);
845  p=(uint8_t *)ptr+len-8;
846  Key::encode_ts64((uint8_t **)&p, ts, false);
847  }
848  }
849 
850  dest_bufp->ensure((ptr-bskey.ptr) + len + 9);
851  Serialization::encode_vi32(&dest_bufp->ptr, len+8);
852  memcpy(dest_bufp->ptr, ptr, len);
853  if (*ptr == Key::AUTO_TIMESTAMP)
854  *dest_bufp->ptr = Key::HAVE_REVISION
856  else
857  *dest_bufp->ptr = Key::HAVE_REVISION
859 
860  // if TIME_ORDER DESC then store a flag in the key
861  if (timeorder_desc)
862  *dest_bufp->ptr |= Key::TS_CHRONOLOGICAL;
863 
864  dest_bufp->ptr += len;
865  Key::encode_ts64(&dest_bufp->ptr, auto_revision,
866  timeorder_desc ? false : true);
867  *revisionp = auto_revision;
868  bskey.ptr = ptr + len;
869 }
std::list< UpdateContext * > m_commit_queue
Stage 2 input queue.
A memory buffer of static size.
Definition: StaticBuffer.h:45
bool empty() const
Returns true if the buffer is empty.
Definition: DynamicBuffer.h:73
static int64_t decode_ts64(const uint8_t **bufp, bool ascending=true)
Definition: Key.h:71
static std::mutex mutex
Definition: Logger.cc:43
std::vector< SendBackRec > send_back_vector
Vector of SendBacRec objects describing rejected key/value pairs.
Definition: UpdateRequest.h:63
std::mutex m_commit_queue_mutex
Mutex protecting stage 2 input queue
void shutdown()
Shuts down the pipeline Sets m_shutdown to true, signals the three pipeline condition variables...
std::vector< UpdateRecTable * > updates
Definition: UpdateContext.h:53
const char * row
Definition: Key.h:129
The FailureInducer simulates errors.
void set_mark()
Sets the mark; the mark can be used by the caller just like a bookmark.
std::shared_ptr< Context > m_context
Range server context
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
uint32_t length
Definition: Key.h:124
uint32_t count
Count of serialized key/value pairs in buffer.
Definition: UpdateRequest.h:59
std::vector< UpdateRequest * > requests
Vector of corresponding client requests.
const char * row() const
Definition: SerializedKey.h:53
UpdatePipeline(ContextPtr &context, QueryCachePtr &query_cache, TimerHandlerPtr &timer_handler, CommitLogPtr &log, Filesystem::Flags flags)
Constructor.
int error
Error code.
Definition: UpdateRequest.h:44
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
bool get_option_time_order_desc() const
Gets time order desc option.
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
uint32_t offset
Starting byte offset within update buffer of rejected key/value pairs.
Definition: UpdateRequest.h:48
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
void commit()
Thread function for stage 2 of update pipeline.
static const uint8_t HAVE_REVISION
Definition: Key.h:43
Holds updates destined for a specific table.
Flags
Enumeration type for append flags.
Definition: Filesystem.h:76
int64_t m_last_revision
Last (largest) assigned revision number.
Column family specification.
void transform_key(ByteString &bskey, DynamicBuffer *dest_bufp, int64_t revision, int64_t *revisionp, bool timeorder_desc)
Filesystem::Flags m_flags
Commit log flush flag (NONE, FLUSH, or SYNC)
STL namespace.
void add_cells_written(uint64_t n)
Definition: Range.h:279
std::unordered_map< Range *, UpdateRecRangeList * > range_map
bool m_shutdown
Flag indicating if pipeline is being shut down.
int32_t m_maintenance_pause_interval
Millisecond pause time at the end of the pipeline (TESTING)
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
static const int64_t TIMESTAMP_MIN
Definition: KeySpec.h:34
std::mutex m_response_queue_mutex
Mutex protecting stage 3 input queue
std::vector< std::thread > m_threads
Update pipeline threads.
Represents a table row range.
Definition: Range.h:69
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Declarations for RangeServerProtocol.
std::mutex m_qualify_queue_mutex
Mutex protecting stage 1 input queue
std::condition_variable m_response_queue_cond
Condition variable signaling addition to stage 3 input queue.
std::list< UpdateContext * > m_qualify_queue
Stage 1 input queue.
StaticBuffer buffer
Update buffer containing serialized key/value pairs.
Definition: UpdateRequest.h:57
void add_and_respond()
Thread function for stage 3 of update pipeline.
void add(const Key &key, const ByteString value)
This method must not fail.
Definition: Range.cc:445
void add_bytes_written(uint64_t n)
Definition: Range.h:275
uint32_t error
Error code that applies to entire buffer.
Definition: UpdateRequest.h:65
A dynamic, resizable memory buffer.
uint64_t m_update_coalesce_limit
Commit log coalesce limit.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
TimerHandlerPtr m_timer_handler
Pointer to timer handler.
EventPtr event
Event object of originating update requst.
Definition: UpdateRequest.h:61
DynamicBuffer * bufp
Pointer to buffer holding updates (serialized key/value pairs).
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
static CommitLogPtr root_log
Definition: Global.h:80
Logging routines and macros.
std::list< UpdateContext * > m_response_queue
Stage 3 input queue.
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
static uint64_t get()
Gets the cluster ID.
Definition: ClusterId.h:85
Compatibility Macros for C/C++.
int32_t m_commit_queue_count
Count of objects in stage 2 input queue.
static const uint8_t HAVE_TIMESTAMP
Definition: Key.h:44
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
#define HT_END
Definition: Logger.h:220
std::shared_ptr< TimerHandler > TimerHandlerPtr
Smart pointer to TimerHandler.
Definition: TimerHandler.h:226
Functions to serialize/deserialize primitives to/from a memory buffer.
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
TableInfoPtr table_info
TableInfo object for destination table.
static const uint8_t TS_CHRONOLOGICAL
Definition: Key.h:47
std::shared_ptr< QueryCache > QueryCachePtr
Smart pointer to QueryCache.
Definition: QueryCache.h:252
int32_t m_max_clock_skew
Maximum allowable clock skew.
Context record for update request passed into UpdatePipeline.
Definition: UpdateContext.h:38
uint32_t len
Length (in bytes) from offset covering key/value pairs rejected.
Definition: UpdateRequest.h:50
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
TableIdentifier id
Table identifier for destination table.
#define HT_FATALF(msg,...)
Definition: Logger.h:343
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
static Hypertable::MaintenanceQueuePtr maintenance_queue
Definition: Global.h:67
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void encode_vi32(uint8_t **bufp, uint32_t val)
Encode a integer (up to 32-bit) in variable length encoding.
static bool ignore_clock_skew_errors
Definition: Global.h:111
static const uint8_t AUTO_TIMESTAMP
Definition: Key.h:45
std::condition_variable m_commit_queue_cond
Condition variable signaling addition to stage 2 input queue.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
uint32_t m_update_delay
Update delay at start of pipeline (TESTING)
Declarations for UpdatePipeline.
uint32_t count
Number of key/value pairs to which error applies.
Definition: UpdateRequest.h:46
std::condition_variable m_qualify_queue_cond
Condition variable signaling addition to stage 1 input queue.
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
void qualify_and_transform()
Thread function for stage 1 of update pipeline.
Provides access to internal components of opaque key.
Definition: Key.h:40
std::shared_ptr< Range > RangePtr
Smart pointer to Range.
Definition: Range.h:404
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
#define HT_FAILURE_SIGNALLED(_label_)
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
Definition: CommitLog.h:223
static void encode_ts64(uint8_t **bufp, int64_t val, bool ascending=true)
Definition: Key.h:52
uint64_t offset
Offset of beginning of update range within bufp.
void add_update(UpdateRequest *request, UpdateRecRange &update)
This is a generic exception class for Hypertable.
Definition: Error.h:314
Holds client update request and error state.
Definition: UpdateRequest.h:54
#define HT_MAYBE_FAIL_X(_label_, _exp_)
static LoadStatisticsPtr load_statistics
Definition: Global.h:72
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Declarations for UpdateRecRange.
void add(UpdateContext *uc)
Adds updates to pipeline Adds uc to m_qualify_queue and signals m_qualify_queue_cond.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
uint8_t * mark
A "bookmark", can be set by the caller.
bool transferring(const char *row)
CommitLogPtr m_log
Pointer to commit log.
uint8_t flag
Definition: Key.h:125
QueryCachePtr m_query_cache
Pointer to query cache.
Holds updates destined for a specific range.
Declarations for UpdateRecTable.
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
Declarations for ClusterId.
#define HT_DEBUG_OUT
Definition: Logger.h:261
Specifies a range of updates (key/value pairs) within a buffer.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
uint8_t * next()
Retrieves the next serialized String in the buffer.
Definition: ByteString.h:71
int code() const
Returns the error code.
Definition: Error.h:391
void reserve(size_t len, bool nocopy=false)
Reserve space for additional data Will grow the space to exactly what's needed.
Definition: DynamicBuffer.h:95
uint64_t len
Length of update range within bufp starting at offset.
static const uint8_t REV_IS_TS
Definition: Key.h:46