0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Range.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 #include "Range.h"
29 
38 
42 
43 #include <Common/Config.h>
44 #include <Common/Error.h>
45 #include <Common/FailureInducer.h>
46 #include <Common/FileUtils.h>
47 #include <Common/Random.h>
48 #include <Common/ScopeGuard.h>
49 #include <Common/StringExt.h>
50 #include <Common/md5.h>
51 
52 #include <boost/algorithm/string.hpp>
53 #include <boost/algorithm/string/predicate.hpp>
54 
55 #include <re2/re2.h>
56 
57 #include <cassert>
58 #include <chrono>
59 #include <string>
60 #include <thread>
61 #include <vector>
62 
63 extern "C" {
64 #include <string.h>
65 }
66 
67 using namespace Hypertable;
68 using namespace std;
69 
71  const TableIdentifier &identifier, SchemaPtr &schema,
72  const RangeSpec &range, RangeSet *range_set,
73  const RangeState &state, bool needs_compaction)
74  : m_master_client(master_client),
75  m_hints_file(identifier.id, range.start_row, range.end_row),
76  m_schema(schema), m_range_set(range_set),
77  m_load_metrics(identifier.id, range.start_row, range.end_row) {
78  m_metalog_entity = make_shared<MetaLogEntityRange>(identifier, range, state, needs_compaction);
79  initialize();
80 }
81 
83  MetaLogEntityRangePtr &range_entity, RangeSet *range_set)
84  : m_master_client(master_client), m_metalog_entity(range_entity),
85  m_hints_file(range_entity->get_table_id(), range_entity->get_start_row(),
86  range_entity->get_end_row()),
87  m_schema(schema), m_range_set(range_set),
88  m_load_metrics(range_entity->get_table_id(), range_entity->get_start_row(),
89  range_entity->get_end_row()) {
90  initialize();
91 }
92 
94  AccessGroupPtr ag;
95  String start_row, end_row;
96 
97  m_metalog_entity->get_boundary_rows(start_row, end_row);
98 
99  m_metalog_entity->get_table_identifier(m_table);
100 
101  m_name = format("%s[%s..%s]", m_table.id, start_row.c_str(), end_row.c_str());
102 
104 
105  m_is_root = m_is_metadata && start_row.empty() &&
106  end_row.compare(Key::END_ROOT_ROW) == 0;
107 
108  memset(m_added_deletes, 0, sizeof(m_added_deletes));
109 
110  uint64_t soft_limit = m_metalog_entity->get_soft_limit();
111  if (m_is_metadata) {
112  if (soft_limit == 0) {
114  m_metalog_entity->set_soft_limit(soft_limit);
115  }
116  m_split_threshold = soft_limit;
117  }
118  else {
119  if (soft_limit == 0 || soft_limit > (uint64_t)Global::range_split_size) {
120  soft_limit = Global::range_split_size;
121  m_metalog_entity->set_soft_limit(soft_limit);
122  }
123  {
124  lock_guard<mutex> lock(Global::mutex);
125  m_split_threshold = soft_limit + (Random::number64() % soft_limit);
126  }
127  }
128 
133  m_metalog_entity->get_state() == RangeState::SPLIT_SHRUNK) {
134  String split_row = m_metalog_entity->get_split_row();
135  String old_boundary_row = m_metalog_entity->get_old_boundary_row();
136  if (split_row.compare(old_boundary_row) < 0)
137  m_split_off_high = true;
138  }
139  else {
140  String split_off = Config::get_str("Hypertable.RangeServer.Range.SplitOff");
141  if (split_off == "high")
142  m_split_off_high = true;
143  else
144  HT_ASSERT(split_off == "low");
145  }
146 
147  m_column_family_vector.resize(m_schema->get_max_column_family_id() + 1);
148 
149  // If no transfer log, check to see if hints file exists and if not, write
150  // one. This is to handle the case of the missing hints file for the
151  // initially loaded range in a table
152  if (m_metalog_entity->get_transfer_log().empty() && !m_hints_file.exists())
154 
155  // Read hints file and load AGname-to-hints map
156  std::map<String, const AccessGroup::Hints *> hints_map;
157  m_hints_file.read();
158  for (const auto &h : m_hints_file.get())
159  hints_map[h.ag_name] = &h;
160 
161  RangeSpecManaged range_spec;
162  m_metalog_entity->get_range_spec(range_spec);
163  for (auto ag_spec : m_schema->get_access_groups()) {
164  const AccessGroup::Hints *h = 0;
165  std::map<String, const AccessGroup::Hints *>::iterator iter = hints_map.find(ag_spec->get_name());
166  if (iter != hints_map.end())
167  h = iter->second;
168  ag = make_shared<AccessGroup>(&m_table, m_schema, ag_spec, &range_spec, h);
169  m_access_group_map[ag_spec->get_name()] = ag;
170  m_access_group_vector.push_back(ag);
171 
172  for (auto cf_spec : ag_spec->columns())
173  m_column_family_vector[cf_spec->get_id()] = ag;
174  }
175 
176 }
177 
178 
181 
182  if (m_initialized)
183  return;
184 
187 
189 
190  m_initialized = true;
191 }
192 
193 void Range::deferred_initialization(uint32_t timeout_millis) {
194 
195  if (m_initialized)
196  return;
197 
198  auto expiration_time = chrono::fast_clock::now() +
199  chrono::milliseconds(timeout_millis);
200 
201  deferred_initialization(expiration_time);
202 }
203 
205 
206  if (m_initialized)
207  return;
208 
209  while (true) {
210  try {
212  }
213  catch (Exception &e) {
214  auto now = chrono::fast_clock::now();
215  if (now < expire_time) {
216  this_thread::sleep_for(chrono::milliseconds(10000));
217  continue;
218  }
219  throw;
220  }
221  break;
222  }
223 }
224 
225 
227  String metadata_key_str, start_row, end_row;
228  KeySpec key;
229  TableMutatorPtr mutator(Global::metadata_table->create_mutator());
230 
231  m_metalog_entity->get_boundary_rows(start_row, end_row);
232 
233  // Reset start row
234  metadata_key_str = String(m_table.id) + ":" + end_row;
235  key.row = metadata_key_str.c_str();
236  key.row_len = metadata_key_str.length();
237  key.column_qualifier = 0;
238  key.column_qualifier_len = 0;
239  key.column_family = "StartRow";
240  mutator->set(key, (uint8_t *)start_row.c_str(), start_row.length());
241 
242  // Get rid of new range
243  metadata_key_str = String(m_table.id) + ":" + m_metalog_entity->get_split_row();
244  key.flag = FLAG_DELETE_ROW;
245  key.row = metadata_key_str.c_str();
246  key.row_len = metadata_key_str.length();
247  key.column_qualifier = 0;
248  key.column_qualifier_len = 0;
249  key.column_family = 0;
250  mutator->set_delete(key);
251 
252  mutator->flush();
253 }
254 
255 namespace {
256  void delete_metadata_pointer(Metadata **metadata) {
257  delete *metadata;
258  *metadata = 0;
259  }
260 }
261 
262 
264  Metadata *metadata = 0;
265  AccessGroupPtr ag;
266  CellStorePtr cellstore;
267  const char *base, *ptr, *end;
268  std::vector<String> csvec;
269  String ag_name;
270  String files;
271  String file_str;
272  String start_row, end_row;
273  uint32_t nextcsid;
274 
275  HT_INFOF("Loading cellstores for '%s'", m_name.c_str());
276 
277  HT_ON_SCOPE_EXIT(&delete_metadata_pointer, &metadata);
278 
279  m_metalog_entity->get_boundary_rows(start_row, end_row);
280 
281  if (m_is_root) {
282  lock_guard<mutex> schema_lock(m_schema_mutex);
283  metadata = new MetadataRoot(m_schema);
284  }
285  else
286  metadata = new MetadataNormal(&m_table, end_row);
287 
288  metadata->reset_files_scan();
289 
290  {
291  lock_guard<mutex> schema_lock(m_schema_mutex);
292  for (auto ag : m_access_group_vector)
293  ag->pre_load_cellstores();
294  }
295 
296  while (metadata->get_next_files(ag_name, files, &nextcsid)) {
297  lock_guard<mutex> schema_lock(m_schema_mutex);
298  csvec.clear();
299 
300  if ((ag = m_access_group_map[ag_name]) == 0) {
301  HT_ERRORF("Unrecognized access group name '%s' found in METADATA for "
302  "table '%s'", ag_name.c_str(), m_table.id);
303  HT_ABORT;
304  }
305 
306  ag->set_next_csid(nextcsid);
307 
308  ptr = base = (const char *)files.c_str();
309  end = base + strlen(base);
310  while (ptr < end) {
311 
312  while (*ptr != ';' && ptr < end)
313  ptr++;
314 
315  file_str = String(base, ptr-base);
316  boost::trim(file_str);
317 
318  if (!file_str.empty()) {
319  if (file_str[0] == '#') {
320  ++ptr;
321  base = ptr;
322  continue;
323  }
324 
325  csvec.push_back(file_str);
326  }
327  ++ptr;
328  base = ptr;
329  }
330 
331  files = "";
332 
333  String file_basename = Global::toplevel_dir + "/tables/";
334 
335  bool skip_not_found = Config::properties->get_bool("Hypertable.RangeServer.CellStore.SkipNotFound");
336  bool skip_bad = Config::properties->get_bool("Hypertable.RangeServer.CellStore.SkipBad");
337 
338  for (size_t i=0; i<csvec.size(); i++) {
339 
340  files += csvec[i] + ";\n";
341 
342  HT_INFOF("Loading CellStore %s", csvec[i].c_str());
343 
344  try {
345  cellstore = CellStoreFactory::open(file_basename + csvec[i],
346  start_row.c_str(), end_row.c_str());
347  }
348  catch (Exception &e) {
349  // issue 986: mapr returns IO_ERROR if CellStore does not exist
353  if (skip_not_found) {
354  HT_WARNF("CellStore file '%s' not found, skipping", csvec[i].c_str());
355  continue;
356  }
357  }
359  if (skip_bad) {
360  HT_WARNF("CellStore file '%s' is corrupt, skipping", csvec[i].c_str());
361  continue;
362  }
363  }
364  HT_FATALF("Problem opening CellStore file '%s' - %s", csvec[i].c_str(),
365  Error::get_text(e.code()));
366  }
367 
368  int64_t revision = boost::any_cast<int64_t>
369  (cellstore->get_trailer()->get("revision"));
370  if (revision > m_latest_revision)
371  m_latest_revision = revision;
372 
373  ag->load_cellstore(cellstore);
374  }
375  }
376 
377  {
378  lock_guard<mutex> schema_lock(m_schema_mutex);
379  for (auto ag : m_access_group_vector)
380  ag->post_load_cellstores();
381  }
382 
383  HT_INFOF("Finished loading cellstores for '%s'", m_name.c_str());
384 }
385 
386 
388  lock_guard<mutex> lock(m_schema_mutex);
389 
390  vector<AccessGroupSpec*> new_access_groups;
391  AccessGroupPtr ag;
392  AccessGroupMap::iterator ag_iter;
393  size_t max_column_family_id = schema->get_max_column_family_id();
394 
395  // only update schema if there is more recent version
396  if(schema->get_generation() <= m_schema->get_generation())
397  return;
398 
399  // resize column family vector if needed
400  if (max_column_family_id > m_column_family_vector.size()-1)
401  m_column_family_vector.resize(max_column_family_id+1);
402 
403  // update all existing access groups & create new ones as needed
404  for (auto ag_spec : schema->get_access_groups()) {
405  if( (ag_iter = m_access_group_map.find(ag_spec->get_name())) !=
406  m_access_group_map.end()) {
407  ag_iter->second->update_schema(schema, ag_spec);
408  for (auto cf_spec : ag_spec->columns()) {
409  if (!cf_spec->get_deleted())
410  m_column_family_vector[cf_spec->get_id()] = ag_iter->second;
411  }
412  }
413  else {
414  new_access_groups.push_back(ag_spec);
415  }
416  }
417 
418  // create new access groups
419  {
420  lock_guard<mutex> lock(m_mutex);
421  m_table.generation = schema->get_generation();
422  m_metalog_entity->set_table_generation(m_table.generation);
423  RangeSpecManaged range_spec;
424  m_metalog_entity->get_range_spec(range_spec);
425  for (auto ag_spec : new_access_groups) {
426  ag = make_shared<AccessGroup>(&m_table, schema, ag_spec, &range_spec);
427  m_access_group_map[ag_spec->get_name()] = ag;
428  m_access_group_vector.push_back(ag);
429  for (auto cf_spec : ag_spec->columns()) {
430  if (!cf_spec->get_deleted())
431  m_column_family_vector[cf_spec->get_id()] = ag;
432  }
433  }
434  }
435 
436  // TODO: remove deleted access groups
437  m_schema = schema;
438  return;
439 }
440 
441 
445 void Range::add(const Key &key, const ByteString value) {
446  HT_DEBUG_OUT <<"key="<< key <<" value='";
447  const uint8_t *p;
448  size_t len = value.decode_length(&p);
449  _out_ << format_bytes(20, p, len) << HT_END;
450 
451  if (key.flag != FLAG_INSERT && key.flag >= KEYSPEC_DELETE_MAX) {
452  HT_ERRORF("Unknown key flag encountered (%d), skipping..", (int)key.flag);
453  return;
454  }
455 
456  if (key.flag == FLAG_DELETE_ROW) {
457  for (size_t i=0; i<m_access_group_vector.size(); ++i)
458  m_access_group_vector[i]->add(key, value);
459  }
460  else {
461  if (key.column_family_code >= m_column_family_vector.size() ||
462  m_column_family_vector[key.column_family_code] == 0) {
463  HT_ERRORF("Bad column family code encountered (%d) for table %s, skipping...",
464  (int)key.column_family_code, m_table.id);
465  return;
466  }
467  m_column_family_vector[key.column_family_code]->add(key, value);
468  }
469 
470  if (key.flag == FLAG_INSERT)
471  m_added_inserts++;
472  else
473  m_added_deletes[key.flag]++;
474 
475  if (key.revision > m_revision)
476  m_revision = key.revision;
477 }
478 
479 
481  scanner = std::make_shared<MergeScannerRange>(m_table.id, scan_ctx);
482  AccessGroupVector ag_vector(0);
483 
485 
486  {
487  lock_guard<mutex> lock(m_schema_mutex);
488  ag_vector = m_access_group_vector;
489  m_scans++;
490  }
491 
492  try {
493  for (auto & ag : ag_vector) {
494  if (ag->include_in_scan(scan_ctx.get()))
495  scanner->add_scanner(ag->create_scanner(scan_ctx.get()));
496  }
497  }
498  catch (Exception &e) {
499  HT_THROW2(e.code(), e, "");
500  }
501 
502  // increment #scanners
503 }
504 
506  const String &table_name) {
507  CellListScannerBuffer *scanner = 0;
508  AccessGroupVector ag_vector(0);
509 
510  if (!m_initialized)
511  deferred_initialization(scan_ctx->timeout_ms);
512 
513  {
514  lock_guard<mutex> lock(m_schema_mutex);
515  ag_vector = m_access_group_vector;
516  m_scans++;
517  }
518 
519  if (table_name != ".cellstore.index")
521 
522  scanner = new CellListScannerBuffer(scan_ctx);
523 
524  try {
525  for (auto &ag : ag_vector)
526  ag->populate_cellstore_index_pseudo_table_scanner(scanner);
527  }
528  catch (Exception &e) {
529  delete scanner;
530  HT_THROW2(e.code(), e, "");
531  }
532 
533  return scanner;
534 }
535 
536 
538  lock_guard<mutex> lock(m_schema_mutex);
539  bool needed = false;
540  int64_t mem, disk, disk_total = 0;
541  if (!m_metalog_entity->get_load_acknowledged() || m_unsplittable)
542  return false;
543  for (size_t i=0; i<m_access_group_vector.size(); ++i) {
544  m_access_group_vector[i]->space_usage(&mem, &disk);
545  disk_total += disk;
546  if (mem >= Global::access_group_max_mem)
547  needed = true;
548  }
549  if (disk_total >= m_split_threshold)
550  needed = true;
551  return needed;
552 }
553 
554 
556  return m_dropped ? true : false;
557 }
558 
559 
562  int flags, TableMutator *mutator) {
563  MaintenanceData *mdata = (MaintenanceData *)arena.alloc( sizeof(MaintenanceData) );
564  AccessGroup::MaintenanceData **tailp = 0;
565  AccessGroupVector ag_vector(0);
566  int64_t size=0;
567  int64_t starting_maintenance_generation;
568 
569  memset(mdata, 0, sizeof(MaintenanceData));
570 
571  {
572  lock_guard<mutex> lock(m_schema_mutex);
573  ag_vector = m_access_group_vector;
574  mdata->load_factors.scans = m_scans;
575  mdata->load_factors.updates = m_updates;
579  }
580 
581  mdata->relinquish = m_relinquish;
582 
583  // record starting maintenance generation
584  {
585  lock_guard<mutex> lock(m_mutex);
586  starting_maintenance_generation = m_maintenance_generation;
592  mdata->table_id = m_table.id;
593  mdata->is_metadata = m_is_metadata;
594  mdata->is_system = m_table.is_system();
595  mdata->state = m_metalog_entity->get_state();
596  mdata->soft_limit = m_metalog_entity->get_soft_limit();
597  mdata->busy = m_maintenance_guard.in_progress() || !m_metalog_entity->get_load_acknowledged();
598  mdata->needs_major_compaction = m_metalog_entity->get_needs_compaction();
599  mdata->initialized = m_initialized;
601  }
602 
603  for (size_t i=0; i<ag_vector.size(); i++) {
604  if (mdata->agdata == 0) {
605  mdata->agdata = ag_vector[i]->get_maintenance_data(arena, now, flags);
606  tailp = &mdata->agdata;
607  }
608  else {
609  (*tailp)->next = ag_vector[i]->get_maintenance_data(arena, now, flags);
610  tailp = &(*tailp)->next;
611  }
612  size += (*tailp)->disk_estimate;
613  mdata->disk_used += (*tailp)->disk_used;
614  mdata->compression_ratio += (double)(*tailp)->disk_used / (*tailp)->compression_ratio;
615  mdata->disk_estimate += (*tailp)->disk_estimate;
616  mdata->memory_used += (*tailp)->mem_used;
617  mdata->memory_allocated += (*tailp)->mem_allocated;
618  mdata->block_index_memory += (*tailp)->block_index_memory;
619  mdata->bloom_filter_memory += (*tailp)->bloom_filter_memory;
620  mdata->bloom_filter_accesses += (*tailp)->bloom_filter_accesses;
621  mdata->bloom_filter_maybes += (*tailp)->bloom_filter_maybes;
622  mdata->bloom_filter_fps += (*tailp)->bloom_filter_fps;
623  mdata->shadow_cache_memory += (*tailp)->shadow_cache_memory;
624  mdata->cell_count += (*tailp)->cell_count;
625  mdata->file_count += (*tailp)->file_count;
626  mdata->key_bytes += (*tailp)->key_bytes;
627  mdata->value_bytes += (*tailp)->value_bytes;
628  }
629 
630  if (mdata->disk_used)
631  mdata->compression_ratio = (double)mdata->disk_used / mdata->compression_ratio;
632  else
633  mdata->compression_ratio = 1.0;
634 
635  if (tailp)
636  (*tailp)->next = 0;
637 
638  if (!m_unsplittable && size >= m_split_threshold)
639  mdata->needs_split = true;
640 
641  mdata->unsplittable = m_unsplittable;
642 
643  if (size > Global::range_maximum_size) {
644  lock_guard<mutex> lock(m_mutex);
645  if (starting_maintenance_generation == m_maintenance_generation)
647  }
648 
650 
651  if (mutator)
652  m_load_metrics.compute_and_store(mutator, now, mdata->load_factors,
653  mdata->disk_used, mdata->memory_used,
654  mdata->compression_ratio);
655 
656  return mdata;
657 }
658 
659 
661 
662  if (!m_initialized)
664 
666 
667  int state = m_metalog_entity->get_state();
668 
669  // Make sure range is in a relinquishable state
670  if (state != RangeState::STEADY &&
673  HT_INFOF("Cancelling relinquish because range is not in relinquishable state (%s)",
674  RangeState::get_text(state).c_str());
675  return;
676  }
677 
678  try {
679  switch (state) {
680  case (RangeState::STEADY):
681  {
682  RangeSpecManaged range_spec;
683  m_metalog_entity->get_range_spec(range_spec);
685  HT_WARNF("Aborting relinquish of %s because marked immovable.", m_name.c_str());
686  return;
687  }
688  }
694  }
695  }
696  catch (Exception &e) {
697  if (e.code() == Error::CANCELLED || cancel_maintenance())
698  return;
699  throw;
700  }
701 
702  {
703  lock_guard<mutex> lock(m_mutex);
706  }
707 
708  HT_INFO("Relinquish Complete.");
709 }
710 
711 
713  String logname;
714  AccessGroupVector ag_vector(0);
715 
716  {
717  lock_guard<mutex> lock(m_schema_mutex);
718  ag_vector = m_access_group_vector;
719  }
720 
721  if (cancel_maintenance())
723 
724  {
725  lock_guard<mutex> lock(m_mutex);
726 
728  m_table.id, m_metalog_entity->get_end_row()).name();
729 
730  Global::log_dfs->mkdirs(logname);
731 
732  m_metalog_entity->set_transfer_log(logname);
733 
734 
740 
741  for (int i=0; true; i++) {
742  try {
743  Global::rsml_writer->record_state(m_metalog_entity);
744  break;
745  }
746  catch (Exception &e) {
747  if (i<3) {
748  HT_WARNF("%s - %s", Error::get_text(e.code()), e.what());
749  this_thread::sleep_for(chrono::milliseconds(5000));
750  continue;
751  }
752  HT_ERRORF("Problem updating meta log entry with RELINQUISH_LOG_INSTALLED state for %s",
753  m_name.c_str());
754  HT_FATAL_OUT << e << HT_END;
755  }
756  }
757  }
758 
762  {
764  lock_guard<mutex> lock(m_mutex);
765  m_transfer_log = make_shared<CommitLog>(Global::dfs, logname, !m_table.is_user());
766  for (size_t i=0; i<ag_vector.size(); i++)
767  ag_vector[i]->stage_compaction();
768  }
769 
770 }
771 
773  String location = Global::location_initializer->get();
774  AccessGroupVector ag_vector(0);
775 
776  {
777  lock_guard<mutex> lock(m_schema_mutex);
778  ag_vector = m_access_group_vector;
779  }
780 
781  if (cancel_maintenance())
783 
787  std::vector<AccessGroup::Hints> hints(ag_vector.size());
788  for (size_t i=0; i<ag_vector.size(); i++)
789  ag_vector[i]->run_compaction(MaintenanceFlag::COMPACT_MINOR |
790  MaintenanceFlag::RELINQUISH, &hints[i]);
791  m_hints_file.set(hints);
792  m_hints_file.write(location);
793 
794  String end_row = m_metalog_entity->get_end_row();
795 
796  // Record "move" in sys/RS_METRICS
798  TableMutatorPtr mutator(Global::rs_metrics_table->create_mutator());
799  KeySpec key;
800  String row = location + ":" + m_table.id;
801  key.row = row.c_str();
802  key.row_len = row.length();
803  key.column_family = "range_move";
804  key.column_qualifier = end_row.c_str();
805  key.column_qualifier_len = end_row.length();
806  try {
807  mutator->set(key, 0, 0);
808  mutator->flush();
809  }
810  catch (Exception &e) {
811  HT_ERROR_OUT << "Problem updating sys/RS_METRICS - " << e << HT_END;
812  }
813  }
814 
815  // Mark range as "dropped" preventing further scans and updates
816  drop();
817 
820 
821 }
822 
824  TableIdentifierManaged table_frozen;
825  String start_row, end_row;
826 
827  {
828  lock_guard<mutex> lock(m_schema_mutex);
829  lock_guard<mutex> lock2(m_mutex);
830  table_frozen = m_table;
831  }
832 
833  m_metalog_entity->get_boundary_rows(start_row, end_row);
834 
835  HT_INFOF("Reporting relinquished range %s to Master", m_name.c_str());
836 
837  RangeSpecManaged range_spec;
838  m_metalog_entity->get_range_spec(range_spec);
839 
840  HT_MAYBE_FAIL("relinquish-move-range");
841 
842  m_master_client->move_range(m_metalog_entity->get_source(),
843  m_metalog_entity->id(),
844  table_frozen, range_spec,
845  m_metalog_entity->get_transfer_log(),
846  m_metalog_entity->get_soft_limit(), false);
847 
848  // Remove range from TableInfo
849  if (!m_range_set->remove(start_row, end_row)) {
850  HT_ERROR_OUT << "Problem removing range " << m_name << HT_END;
851  HT_ABORT;
852  }
853 
854  // Mark the Range entity for removal
855  std::vector<MetaLog::EntityPtr> entities;
856  m_metalog_entity->mark_for_removal();
857  entities.push_back(m_metalog_entity);
858 
859  // Add acknowledge relinquish task
860  MetaLog::EntityTaskPtr acknowledge_relinquish_task =
861  make_shared<MetaLog::EntityTaskAcknowledgeRelinquish>(m_metalog_entity->get_source(),
862  m_metalog_entity->id(),
863  table_frozen, range_spec);
864  entities.push_back(acknowledge_relinquish_task);
865 
869  for (int i=0; true; i++) {
870  try {
871  Global::rsml_writer->record_state(entities);
872  break;
873  }
874  catch (Exception &e) {
875  if (i<6) {
876  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what());
877  this_thread::sleep_for(chrono::milliseconds(5000));
878  continue;
879  }
880  HT_ERRORF("Problem recording removal for range %s", m_name.c_str());
881  HT_FATAL_OUT << e << HT_END;
882  }
883  }
884 
885  // Add tasks to work queue
886  Global::add_to_work_queue(acknowledge_relinquish_task);
887 
888  // disables any further maintenance
890 }
891 
892 
893 
894 
895 
896 void Range::split() {
897 
898  if (!m_initialized)
900 
902  String old_start_row;
903 
904  // do not split if the RangeServer is not yet fully initialized
905  if (Global::rsml_writer.get() == 0)
906  return;
907 
909 
910  int state = m_metalog_entity->get_state();
911 
912  // Make sure range is in a splittable state
913  if (state != RangeState::STEADY &&
915  state != RangeState::SPLIT_SHRUNK) {
916  HT_INFOF("Cancelling split because range is not in splittable state (%s)",
917  RangeState::get_text(state).c_str());
918  return;
919  }
920 
921  if (m_unsplittable) {
922  HT_WARNF("Split attempted on range %s, but marked unsplittable",
923  m_name.c_str());
924  return;
925  }
926 
927  try {
928  switch (state) {
929 
930  case (RangeState::STEADY):
932 
935 
938  }
939  }
940  catch (Exception &e) {
941  if (e.code() == Error::CANCELLED || cancel_maintenance())
942  return;
943  throw;
944  }
945 
946  {
947  lock_guard<mutex> lock(m_mutex);
950  }
951 
952  HT_INFOF("Split Complete. New Range end_row=%s",
953  m_metalog_entity->get_end_row().c_str());
954 }
955 
956 
957 
961  String split_row;
962  std::vector<String> split_rows;
963  AccessGroupVector ag_vector(0);
964  String logname;
965  String start_row, end_row;
966 
967  m_metalog_entity->get_boundary_rows(start_row, end_row);
968 
969  {
970  lock_guard<mutex> lock(m_schema_mutex);
971  ag_vector = m_access_group_vector;
972  }
973 
974  if (cancel_maintenance())
976 
983  {
984  StlArena arena(128000);
985  CellList::SplitRowDataMapT split_row_data =
987 
988  // Fetch CellStore block index split row data from
989  for (const auto &ag : ag_vector)
990  ag->split_row_estimate_data_stored(split_row_data);
991 
992  // Fetch CellCache split row data from
993  for (const auto &ag : ag_vector)
994  ag->split_row_estimate_data_cached(split_row_data);
995 
996  // Estimate split row from split row data
997  if (!estimate_split_row(split_row_data, split_row)) {
999  m_unsplittable = true;
1000  HT_WARNF("Split attempt aborted for range %s because it is marked unsplittable",
1001  m_name.c_str());
1003  }
1006  "Unable to determine split row for range %s",
1007  m_name.c_str());
1008  }
1009 
1010  // Instrumentation for issue 1193
1011  if (split_row.compare(end_row) >= 0 || split_row.compare(start_row) <= 0) {
1012  LoadDataEscape escaper;
1013  String escaped_start_row, escaped_end_row, escaped_split_row;
1014  for (auto &entry : split_row_data) {
1015  escaper.escape(entry.first, strlen(entry.first), escaped_split_row);
1016  HT_ERRORF("[split_row_data] %lld %s", (Lld)entry.second, escaped_split_row.c_str());
1017  }
1018  escaper.escape(start_row.c_str(), start_row.length(), escaped_start_row);
1019  escaper.escape(end_row.c_str(), end_row.length(), escaped_end_row);
1020  escaper.escape(split_row.c_str(), split_row.length(), escaped_split_row);
1021  HT_FATALF("Bad split row estimate (%s) for range %s[%s..%s]",
1022  escaped_split_row.c_str(), m_table.id, escaped_start_row.c_str(),
1023  escaped_end_row.c_str());
1024  }
1025 
1026  HT_INFOF("Split row estimate for %s is '%s'",
1027  m_name.c_str(), split_row.c_str());
1028  }
1029 
1030 
1031  {
1032  lock_guard<mutex> lock(m_mutex);
1033  m_metalog_entity->set_split_row(split_row);
1034 
1036  m_table.id, split_row).name();
1037 
1038  Global::log_dfs->mkdirs(logname);
1039 
1040  m_metalog_entity->set_transfer_log(logname);
1041 
1042  if (m_split_off_high)
1043  m_metalog_entity->set_old_boundary_row(end_row);
1044  else
1045  m_metalog_entity->set_old_boundary_row(start_row);
1046 
1052  for (int i=0; true; i++) {
1053  try {
1054  Global::rsml_writer->record_state(m_metalog_entity);
1055  break;
1056  }
1057  catch (Exception &e) {
1058  if (i<3) {
1059  HT_WARNF("%s - %s", Error::get_text(e.code()), e.what());
1060  this_thread::sleep_for(chrono::milliseconds(5000));
1061  continue;
1062  }
1063  HT_ERRORF("Problem updating meta log with SPLIT_LOG_INSTALLED state for %s "
1064  "split-point='%s'", m_name.c_str(), split_row.c_str());
1065  HT_FATAL_OUT << e << HT_END;
1066  }
1067  }
1068  }
1069 
1073  {
1075  lock_guard<mutex> lock(m_mutex);
1076  m_split_row = split_row;
1077  for (size_t i=0; i<ag_vector.size(); i++)
1078  ag_vector[i]->stage_compaction();
1079  m_transfer_log = make_shared<CommitLog>(Global::dfs, logname, !m_table.is_user());
1080  }
1081 
1082  HT_MAYBE_FAIL("split-1");
1083  HT_MAYBE_FAIL_X("metadata-split-1", m_is_metadata);
1084 }
1085 
1087 
1088  // Set target to half the total number of keys
1089  int64_t target = 0;
1090  for (CellList::SplitRowDataMapT::iterator iter=split_row_data.begin();
1091  iter != split_row_data.end(); ++iter)
1092  target += iter->second;
1093  target /= 2;
1094 
1095  row.clear();
1096  if (target == 0)
1097  return false;
1098 
1099  int64_t cumulative = 0;
1100  for (CellList::SplitRowDataMapT::iterator iter=split_row_data.begin();
1101  iter != split_row_data.end(); ++iter) {
1102  if (cumulative + iter->second >= target) {
1103  if (cumulative > 0)
1104  --iter;
1105  row = iter->first;
1106  break;
1107  }
1108  cumulative += iter->second;
1109  }
1110  HT_ASSERT(!row.empty());
1111  // If row chosen above is same as end row, find largest row <= end_row
1112  String end_row = m_metalog_entity->get_end_row();
1113  if (row.compare(end_row) >= 0) {
1114  row.clear();
1115  for (CellList::SplitRowDataMapT::iterator iter=split_row_data.begin();
1116  iter != split_row_data.end(); ++iter) {
1117  if (strcmp(iter->first, end_row.c_str()) < 0)
1118  row = iter->first;
1119  else
1120  break;
1121  }
1122  return !row.empty();
1123  }
1124  return true;
1125 }
1126 
1127 
1129  int error;
1130  String start_row, end_row, split_row;
1131  AccessGroupVector ag_vector(0);
1132  String location = Global::location_initializer->get();
1133 
1134  m_metalog_entity->get_boundary_rows(start_row, end_row);
1135  split_row = m_metalog_entity->get_split_row();
1136 
1137  {
1138  lock_guard<mutex> lock(m_schema_mutex);
1139  ag_vector = m_access_group_vector;
1140  }
1141 
1142  if (cancel_maintenance())
1144 
1145  AccessGroupHintsFile new_hints_file(m_table.id, start_row, end_row);
1146  std::vector<AccessGroup::Hints> hints(ag_vector.size());
1147 
1151  for (size_t i=0; i<ag_vector.size(); i++)
1152  ag_vector[i]->run_compaction(MaintenanceFlag::COMPACT_MAJOR|MaintenanceFlag::SPLIT,
1153  &hints[i]);
1154 
1155  m_hints_file.set(hints);
1156  new_hints_file.set(hints);
1157 
1158  String files;
1159  String metadata_row_low, metadata_row_high;
1160  int64_t total_blocks;
1161  KeySpec key_low, key_high;
1162  char buf[32];
1163 
1164  TableMutatorPtr mutator(Global::metadata_table->create_mutator());
1165 
1166  // For new range with existing end row, update METADATA entry with new
1167  // 'StartRow' column.
1168 
1169  metadata_row_high = String("") + m_table.id + ":" + end_row;
1170  key_high.row = metadata_row_high.c_str();
1171  key_high.row_len = metadata_row_high.length();
1172  key_high.column_qualifier = 0;
1173  key_high.column_qualifier_len = 0;
1174  key_high.column_family = "StartRow";
1175  mutator->set(key_high, (uint8_t *)split_row.c_str(), split_row.length());
1176 
1177  // This is needed to strip out the "live file" references
1178  if (m_split_off_high) {
1179  key_high.column_family = "Files";
1180  for (size_t i=0; i<ag_vector.size(); i++) {
1181  key_high.column_qualifier = ag_vector[i]->get_name();
1182  key_high.column_qualifier_len = strlen(ag_vector[i]->get_name());
1183  ag_vector[i]->get_file_data(files, &total_blocks, false);
1184  if (files != "")
1185  mutator->set(key_high, (uint8_t *)files.c_str(), files.length());
1186  }
1187  }
1188 
1189  // For new range whose end row is the split point, create a new METADATA
1190  // entry
1191  metadata_row_low = format("%s:%s", m_table.id, split_row.c_str());
1192  key_low.row = metadata_row_low.c_str();
1193  key_low.row_len = metadata_row_low.length();
1194  key_low.column_qualifier = 0;
1195  key_low.column_qualifier_len = 0;
1196 
1197  key_low.column_family = "StartRow";
1198  mutator->set(key_low, start_row.c_str(), start_row.length());
1199 
1200  for (size_t i=0; i<ag_vector.size(); i++) {
1201  ag_vector[i]->get_file_data(files, &total_blocks, m_split_off_high);
1202  key_low.column_family = key_high.column_family = "BlockCount";
1203  key_low.column_qualifier = key_high.column_qualifier = ag_vector[i]->get_name();
1204  key_low.column_qualifier_len = key_high.column_qualifier_len = strlen(ag_vector[i]->get_name());
1205  sprintf(buf, "%llu", (Llu)total_blocks/2);
1206  mutator->set(key_low, (uint8_t *)buf, strlen(buf));
1207  mutator->set(key_high, (uint8_t *)buf, strlen(buf));
1208  if (files != "") {
1209  key_low.column_family = "Files";
1210  mutator->set(key_low, (uint8_t *)files.c_str(), files.length());
1211  }
1212  }
1213  if (m_split_off_high) {
1214  key_low.column_qualifier = 0;
1215  key_low.column_qualifier_len = 0;
1216  key_low.column_family = "Location";
1217  mutator->set(key_low, location.c_str(), location.length());
1218  }
1219 
1220  mutator->flush();
1221 
1225  {
1228  lock_guard<mutex> lock(m_mutex);
1229 
1230  // Shrink access groups
1231  if (m_split_off_high)
1232  m_range_set->change_end_row(start_row, end_row, split_row);
1233  else
1234  m_range_set->change_start_row(start_row, split_row, end_row);
1235 
1236  // Shrink access groups
1237  if (m_split_off_high) {
1238  m_metalog_entity->set_end_row(split_row);
1239  m_hints_file.change_end_row(split_row);
1240  new_hints_file.change_start_row(split_row);
1241  end_row = split_row;
1242  }
1243  else {
1244  m_metalog_entity->set_start_row(split_row);
1245  m_hints_file.change_start_row(split_row);
1246  new_hints_file.change_end_row(split_row);
1247  start_row = split_row;
1248  }
1249 
1250  m_load_metrics.change_rows(start_row, end_row);
1251 
1252  m_name = String(m_table.id)+"["+start_row+".."+end_row+"]";
1253  for (size_t i=0; i<ag_vector.size(); i++)
1254  ag_vector[i]->shrink(split_row, m_split_off_high, &hints[i]);
1255 
1256  // Close and uninstall split log
1257  m_split_row = "";
1258  if ((error = m_transfer_log->close()) != Error::OK) {
1259  HT_ERRORF("Problem closing split log '%s' - %s",
1260  m_transfer_log->get_log_dir().c_str(), Error::get_text(error));
1261  }
1262  m_transfer_log = 0;
1263 
1271  m_hints_file.set(hints);
1272  m_hints_file.write(location);
1273  for (size_t i=0; i<new_hints_file.get().size(); i++) {
1274  if (hints[i].disk_usage > new_hints_file.get()[i].disk_usage) {
1275  // issue 1159
1276  HT_ERRORF("hints[%d].disk_usage (%llu) > new_hints_file.get()[%d].disk_usage (%llu)",
1277  (int)i, (Llu)hints[i].disk_usage, (int)i, (Llu)new_hints_file.get()[i].disk_usage);
1278  HT_ERRORF("%s", ag_vector[i]->describe().c_str());
1279  HT_ASSERT(hints[i].disk_usage <= new_hints_file.get()[i].disk_usage);
1280  }
1281  new_hints_file.get()[i].disk_usage -= hints[i].disk_usage;
1282  }
1283  new_hints_file.write("");
1284  }
1285 
1286  if (m_split_off_high) {
1288  {
1289  char md5DigestStr[33];
1290  String table_dir, range_dir;
1291 
1292  md5_trunc_modified_base64(end_row.c_str(), md5DigestStr);
1293  md5DigestStr[16] = 0;
1294  table_dir = Global::toplevel_dir + "/tables/" + m_table.id;
1295 
1296  {
1297  lock_guard<mutex> lock(m_schema_mutex);
1298  for (auto ag_spec : m_schema->get_access_groups()) {
1299  // notice the below variables are different "range" vs. "table"
1300  range_dir = table_dir + "/" + ag_spec->get_name() + "/" + md5DigestStr;
1301  Global::dfs->mkdirs(range_dir);
1302  }
1303  }
1304  }
1305 
1306  }
1307 
1311  {
1312  lock_guard<mutex> lock(m_mutex);
1313  m_metalog_entity->set_state(RangeState::SPLIT_SHRUNK, location);
1314  for (int i=0; true; i++) {
1315  try {
1316  Global::rsml_writer->record_state(m_metalog_entity);
1317  break;
1318  }
1319  catch (Exception &e) {
1320  if (i<3) {
1321  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what());
1322  this_thread::sleep_for(chrono::milliseconds(5000));
1323  continue;
1324  }
1325  HT_ERRORF("Problem updating meta log entry with SPLIT_SHRUNK state %s "
1326  "split-point='%s'", m_name.c_str(), split_row.c_str());
1327  HT_FATAL_OUT << e << HT_END;
1328  }
1329  }
1330  }
1331 
1332  HT_MAYBE_FAIL("split-2");
1333  HT_MAYBE_FAIL_X("metadata-split-2", m_is_metadata);
1334 
1335 }
1336 
1337 
1339  RangeSpecManaged range;
1340  TableIdentifierManaged table_frozen;
1341  String start_row, end_row, old_boundary_row;
1342  int64_t soft_limit = m_metalog_entity->get_soft_limit();
1343 
1344  m_metalog_entity->get_boundary_rows(start_row, end_row);
1345  old_boundary_row = m_metalog_entity->get_old_boundary_row();
1346 
1347  if (cancel_maintenance())
1349 
1350  if (m_split_off_high) {
1351  range.set_start_row(end_row);
1352  range.set_end_row(old_boundary_row);
1353  }
1354  else {
1355  range.set_start_row(old_boundary_row);
1356  range.set_end_row(start_row);
1357  }
1358 
1359  // update the latest generation, this should probably be protected
1360  {
1361  lock_guard<mutex> lock(m_schema_mutex);
1362  table_frozen = m_table;
1363  }
1364 
1365  HT_INFOF("Reporting newly split off range %s[%s..%s] to Master",
1366  m_table.id, range.start_row, range.end_row);
1367 
1368  if (!m_is_metadata && soft_limit < Global::range_split_size) {
1369  soft_limit *= 2;
1370  if (soft_limit > Global::range_split_size)
1371  soft_limit = Global::range_split_size;
1372  }
1373 
1374  m_master_client->move_range(m_metalog_entity->get_source(),
1375  m_metalog_entity->id(),
1376  table_frozen, range,
1377  m_metalog_entity->get_transfer_log(),
1378  soft_limit, true);
1379 
1385  HT_MAYBE_FAIL("split-3");
1386  HT_MAYBE_FAIL_X("metadata-split-3", m_is_metadata);
1387 
1388  MetaLog::EntityTaskPtr acknowledge_relinquish_task;
1389  std::vector<MetaLog::EntityPtr> entities;
1390 
1391  // Add Range entity with updated state
1392  entities.push_back(m_metalog_entity);
1393 
1394  // Add acknowledge relinquish task
1395  acknowledge_relinquish_task =
1396  make_shared<MetaLog::EntityTaskAcknowledgeRelinquish>(m_metalog_entity->get_source(),
1397  m_metalog_entity->id(),
1398  table_frozen, range);
1399  entities.push_back(acknowledge_relinquish_task);
1400 
1404  m_metalog_entity->clear_state();
1405  m_metalog_entity->set_soft_limit(soft_limit);
1406 
1407  for (int i=0; true; i++) {
1408  try {
1409  Global::rsml_writer->record_state(entities);
1410  break;
1411  }
1412  catch (Exception &e) {
1413  if (i<2) {
1414  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what());
1415  this_thread::sleep_for(chrono::milliseconds(5000));
1416  continue;
1417  }
1418  HT_ERRORF("Problem updating meta log with STEADY state for %s",
1419  m_name.c_str());
1420  HT_FATAL_OUT << e << HT_END;
1421  }
1422  }
1423 
1424  // Add tasks to work queue
1425  Global::add_to_work_queue(acknowledge_relinquish_task);
1426 
1427  HT_MAYBE_FAIL("split-4");
1428  HT_MAYBE_FAIL_X("metadata-split-4", m_is_metadata);
1429 }
1430 
1431 
1433 
1434  if (!m_initialized)
1436 
1438  AccessGroupVector ag_vector(0);
1439  int flags = 0;
1440  int state = m_metalog_entity->get_state();
1441 
1442  // Make sure range is in a compactible state
1443  if (state == RangeState::RELINQUISH_LOG_INSTALLED ||
1445  state == RangeState::SPLIT_SHRUNK) {
1446  HT_INFOF("Cancelling compact because range is not in compactable state (%s)",
1447  RangeState::get_text(state).c_str());
1448  return;
1449  }
1450 
1451  {
1452  lock_guard<mutex> lock(m_schema_mutex);
1453  ag_vector = m_access_group_vector;
1454  }
1455 
1456  try {
1457 
1458  // Initiate minor compactions (freeze cell cache)
1459  {
1461  lock_guard<mutex> lock(m_mutex);
1462  for (size_t i=0; i<ag_vector.size(); i++) {
1463  if (m_metalog_entity->get_needs_compaction() ||
1464  subtask_map.compaction(ag_vector[i].get()))
1465  ag_vector[i]->stage_compaction();
1466  }
1467  }
1468 
1469  // do compactions
1470  bool successfully_compacted = false;
1471  std::vector<AccessGroup::Hints> hints(ag_vector.size());
1472  for (size_t i=0; i<ag_vector.size(); i++) {
1473 
1474  if (m_metalog_entity->get_needs_compaction())
1476  else
1477  flags = subtask_map.flags(ag_vector[i].get());
1478 
1479  if (flags & MaintenanceFlag::COMPACT) {
1480  try {
1481  ag_vector[i]->run_compaction(flags, &hints[i]);
1482  successfully_compacted = true;
1483  }
1484  catch (Exception &e) {
1485  ag_vector[i]->unstage_compaction();
1486  ag_vector[i]->load_hints(&hints[i]);
1487  }
1488  }
1489  else
1490  ag_vector[i]->load_hints(&hints[i]);
1491  }
1492  if (successfully_compacted) {
1493  m_hints_file.set(hints);
1495  }
1496  }
1497  catch (Exception &e) {
1498  if (e.code() == Error::CANCELLED || cancel_maintenance())
1499  return;
1500  throw;
1501  }
1502 
1503  if (m_metalog_entity->get_needs_compaction()) {
1504  try {
1505  lock_guard<mutex> lock(m_mutex);
1506  m_metalog_entity->set_needs_compaction(false);
1507  Global::rsml_writer->record_state(m_metalog_entity);
1508  }
1509  catch (Exception &e) {
1510  HT_ERRORF("Problem updating meta log entry for %s", m_name.c_str());
1511  }
1512  }
1513 
1514  {
1515  lock_guard<mutex> lock(m_mutex);
1519  }
1520 }
1521 
1522 
1523 
1525 
1526  if (!m_initialized)
1528 
1530  AccessGroupVector ag_vector(0);
1531  uint64_t memory_purged = 0;
1532  int state = m_metalog_entity->get_state();
1533 
1534  // Make sure range is in a compactible state
1535  if (state == RangeState::RELINQUISH_LOG_INSTALLED ||
1537  state == RangeState::SPLIT_SHRUNK) {
1538  HT_INFOF("Cancelling memory purge because range is not in purgeable state (%s)",
1539  RangeState::get_text(state).c_str());
1540  return;
1541  }
1542 
1543  {
1544  lock_guard<mutex> lock(m_schema_mutex);
1545  ag_vector = m_access_group_vector;
1546  }
1547 
1548  try {
1549  for (size_t i=0; i<ag_vector.size(); i++) {
1550  if ( subtask_map.memory_purge(ag_vector[i].get()) )
1551  memory_purged += ag_vector[i]->purge_memory(subtask_map);
1552  }
1553  }
1554  catch (Exception &e) {
1555  if (e.code() == Error::CANCELLED || cancel_maintenance())
1556  return;
1557  throw;
1558  }
1559 
1560  {
1561  lock_guard<mutex> lock(m_mutex);
1563  }
1564 
1565  HT_INFOF("Memory Purge complete for range %s. Purged %llu bytes of memory",
1566  m_name.c_str(), (Llu)memory_purged);
1567 
1568 }
1569 
1570 
1575  int state = m_metalog_entity->get_state();
1576 
1577  if ((state & RangeState::SPLIT_LOG_INSTALLED)
1578  == RangeState::SPLIT_LOG_INSTALLED ||
1580  == RangeState::RELINQUISH_LOG_INSTALLED) {
1581  CommitLogReaderPtr commit_log_reader =
1582  make_shared<CommitLogReader>(Global::dfs, m_metalog_entity->get_transfer_log());
1583 
1584  replay_transfer_log(commit_log_reader.get());
1585 
1586  commit_log_reader = 0;
1587 
1588  m_transfer_log = make_shared<CommitLog>(Global::dfs, m_metalog_entity->get_transfer_log(),
1589  !m_table.is_user());
1590 
1591  // re-initiate compaction
1592  for (size_t i=0; i<m_access_group_vector.size(); i++)
1593  m_access_group_vector[i]->stage_compaction();
1594 
1595  String transfer_log = m_metalog_entity->get_transfer_log();
1596  if ((state & RangeState::SPLIT_LOG_INSTALLED)
1597  == RangeState::SPLIT_LOG_INSTALLED) {
1598  m_split_row = m_metalog_entity->get_split_row();
1599  HT_INFOF("Restored range state to SPLIT_LOG_INSTALLED (split point='%s' "
1600  "xfer log='%s')", m_split_row.c_str(), transfer_log.c_str());
1601  }
1602  else
1603  HT_INFOF("Restored range state to RELINQUISH_LOG_INSTALLED (xfer "
1604  "log='%s')", transfer_log.c_str());
1605  }
1606 
1607  for (size_t i=0; i<m_access_group_vector.size(); i++)
1609 }
1610 
1611 
1612 void Range::lock() {
1613  m_schema_mutex.lock();
1614  m_updates++; // assumes this method is called for updates only
1615  for (size_t i=0; i<m_access_group_vector.size(); ++i)
1618 }
1619 
1620 
1622 
1623  // this needs to happen before the subsequent m_mutex lock
1624  // because the lock ordering is assumed to be Range::m_mutex -> AccessGroup::lock
1625  for (size_t i=0; i<m_access_group_vector.size(); ++i)
1627 
1628  {
1629  lock_guard<mutex> lock(m_mutex);
1632  }
1633 
1634  m_schema_mutex.unlock();
1635 }
1636 
1637 
1642  BlockHeaderCommitLog header;
1643  const uint8_t *base, *ptr, *end;
1644  size_t len;
1645  ByteString key, value;
1646  Key key_comps;
1647  size_t nblocks = 0;
1648  size_t count = 0;
1649  TableIdentifier table_id;
1650 
1652 
1653  try {
1654 
1655  while (commit_log_reader->next(&base, &len, &header)) {
1656 
1657  ptr = base;
1658  end = base + len;
1659 
1660  table_id.decode(&ptr, &len);
1661 
1662  if (strcmp(m_table.id, table_id.id))
1664  "Table name mis-match in split log replay \"%s\" != \"%s\"",
1665  m_table.id, table_id.id);
1666 
1667  while (ptr < end) {
1668  key.ptr = (uint8_t *)ptr;
1669  key_comps.load(key);
1670  ptr += key_comps.length;
1671  value.ptr = (uint8_t *)ptr;
1672  ptr += value.length();
1673  add(key_comps, value);
1674  count++;
1675  }
1676  nblocks++;
1677  }
1678 
1681 
1682  HT_INFOF("Replayed %d updates (%d blocks) from transfer log '%s' into %s",
1683  (int)count, (int)nblocks, commit_log_reader->get_log_dir().c_str(),
1684  m_name.c_str());
1685 
1686  m_added_inserts = 0;
1687  memset(m_added_deletes, 0, 3*sizeof(int64_t));
1688 
1689  }
1690  catch (Hypertable::Exception &e) {
1691  HT_ERROR_OUT << "Problem replaying split log - " << e << HT_END;
1694  throw;
1695  }
1696 }
1697 
1698 
1699 int64_t Range::get_scan_revision(uint32_t timeout_ms) {
1700 
1701  if (!m_initialized)
1702  deferred_initialization(timeout_ms);
1703 
1704  lock_guard<mutex> lock(m_mutex);
1705  return m_latest_revision;
1706 }
1707 
1708 void Range::acknowledge_load(uint32_t timeout_ms) {
1709 
1710  if (!m_initialized)
1711  deferred_initialization(timeout_ms);
1712 
1713  lock_guard<mutex> lock(m_mutex);
1714  m_metalog_entity->set_load_acknowledged(true);
1715 
1716  if (Global::rsml_writer == 0)
1717  HT_THROW(Error::SERVER_SHUTTING_DOWN, "Pointer to RSML Writer is NULL");
1718 
1719  HT_MAYBE_FAIL_X("user-range-acknowledge-load-pause-1", !m_table.is_system());
1720  HT_MAYBE_FAIL_X("user-range-acknowledge-load-1", !m_table.is_system());
1721 
1722  try {
1723  Global::rsml_writer->record_state(m_metalog_entity);
1724  }
1725  catch (Exception &e) {
1726  m_metalog_entity->set_load_acknowledged(false);
1727  throw;
1728  }
1729 }
1730 
1731 
1732 std::ostream &Hypertable::operator<<(std::ostream &os, const Range::MaintenanceData &mdata) {
1733  os << "table_id=" << mdata.table_id << "\n";
1734  os << "scans=" << mdata.load_factors.scans << "\n";
1735  os << "updates=" << mdata.load_factors.updates << "\n";
1736  os << "cells_scanned=" << mdata.load_factors.cells_scanned << "\n";
1737  os << "cells_returned=" << mdata.cells_returned << "\n";
1738  os << "cells_written=" << mdata.load_factors.cells_written << "\n";
1739  os << "bytes_scanned=" << mdata.load_factors.bytes_scanned << "\n";
1740  os << "bytes_returned=" << mdata.bytes_returned << "\n";
1741  os << "bytes_written=" << mdata.load_factors.bytes_written << "\n";
1742  os << "disk_bytes_read=" << mdata.load_factors.disk_bytes_read << "\n";
1743  os << "purgeable_index_memory=" << mdata.purgeable_index_memory << "\n";
1744  os << "compact_memory=" << mdata.compact_memory << "\n";
1745  os << "soft_limit=" << mdata.soft_limit << "\n";
1746  os << "schema_generation=" << mdata.schema_generation << "\n";
1747  os << "priority=" << mdata.priority << "\n";
1748  os << "state=" << mdata.state << "\n";
1749  os << "maintenance_flags=" << mdata.maintenance_flags << "\n";
1750  os << "file_count=" << mdata.file_count << "\n";
1751  os << "cell_count=" << mdata.cell_count << "\n";
1752  os << "memory_used=" << mdata.memory_used << "\n";
1753  os << "memory_allocated=" << mdata.memory_allocated << "\n";
1754  os << "key_bytes=" << mdata.key_bytes << "\n";
1755  os << "value_bytes=" << mdata.value_bytes << "\n";
1756  os << "compression_ratio=" << mdata.compression_ratio << "\n";
1757  os << "disk_used=" << mdata.disk_used << "\n";
1758  os << "disk_estimate=" << mdata.disk_estimate << "\n";
1759  os << "shadow_cache_memory=" << mdata.shadow_cache_memory << "\n";
1760  os << "block_index_memory=" << mdata.block_index_memory << "\n";
1761  os << "bloom_filter_memory=" << mdata.bloom_filter_memory << "\n";
1762  os << "bloom_filter_accesses=" << mdata.bloom_filter_accesses << "\n";
1763  os << "bloom_filter_maybes=" << mdata.bloom_filter_maybes << "\n";
1764  os << "bloom_filter_fps=" << mdata.bloom_filter_fps << "\n";
1765  os << "busy=" << (mdata.busy ? "true" : "false") << "\n";
1766  os << "is_metadata=" << (mdata.is_metadata ? "true" : "false") << "\n";
1767  os << "is_system=" << (mdata.is_system ? "true" : "false") << "\n";
1768  os << "relinquish=" << (mdata.relinquish ? "true" : "false") << "\n";
1769  os << "needs_major_compaction=" << (mdata.needs_major_compaction ? "true" : "false") << "\n";
1770  os << "needs_split=" << (mdata.needs_split ? "true" : "false") << "\n";
1771  os << "load_acknowledged=" << (mdata.load_acknowledged ? "true" : "false") << "\n";
1772  os << "unsplittable=" << (mdata.unsplittable ? "true" : "false") << "\n";
1773  return os;
1774 }
void split_install_log()
Definition: Range.cc:960
static LocationInitializerPtr location_initializer
Definition: Global.h:83
bool need_maintenance()
Definition: Range.cc:537
uint64_t m_added_inserts
Definition: Range.h:391
void change_end_row(const String &end_row)
Changes the end row.
AccessGroupMap m_access_group_map
Definition: Range.h:375
uint64_t m_bytes_written
Definition: Range.h:364
uint64_t m_cells_written
Definition: Range.h:360
Reads and writes access group "hints" file.
bool m_is_root
Definition: Range.h:387
virtual void change_end_row(const String &start_row, const String &old_end_row, const String &new_end_row)=0
Changes the end row key associated with a range.
Cell list scanner over a buffer of cells.
#define HT_WARNF(msg,...)
Definition: Logger.h:290
String end_row()
Definition: Range.h:165
Declarations for CellStoreFactory.
The FailureInducer simulates errors.
Range specification.
Definition: RangeSpec.h:40
MetaLogEntityRangePtr m_metalog_entity
Definition: Range.h:370
int64_t m_revision
Definition: Range.h:379
int64_t m_latest_revision
Definition: Range.h:380
static TablePtr metadata_table
Definition: Global.h:91
AccessGroupHintsFile m_hints_file
Definition: Range.h:371
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
uint32_t length
Definition: Key.h:124
bool load_acknowledged()
Definition: Range.h:328
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
void split_install_log_rollback_metadata()
Definition: Range.cc:226
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
uint64_t m_scans
Definition: Range.h:357
std::vector< AccessGroupPtr > AccessGroupVector
Definition: Range.h:121
Declarations for CommitLogReader.
void change_rows(const String &start_row, const String &end_row)
virtual void change_start_row(const String &old_start_row, const String &new_start_row, const String &new_end_row)=0
Changes the start row key associated with a range.
bool estimate_split_row(CellList::SplitRowDataMapT &split_row_data, String &row)
Definition: Range.cc:1086
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
const char * column_qualifier
Definition: KeySpec.h:128
string & name()
Returns pathname of transfer log.
Definition: TransferLog.h:80
void deferred_initialization()
Definition: Range.cc:179
void drop()
Definition: Range.h:289
uint64_t m_updates
Definition: Range.h:361
void split_compact_and_shrink()
Definition: Range.cc:1128
MaintenanceData * get_maintenance_data(ByteArena &arena, time_t now, int flags, TableMutator *mutator=0)
Definition: Range.cc:561
int64_t get_scan_revision(uint32_t timeout_ms)
Definition: Range.cc:1699
Barrier m_update_barrier
Definition: Range.h:384
std::string & get_log_dir()
#define HT_ABORT
Definition: Logger.h:175
#define HT_INFO(msg)
Definition: Logger.h:271
Maps object pointers to bit fields.
STL namespace.
Interface for removing a range or changing its end row in a Range set.
Definition: RangeSet.h:32
void relinquish_install_log()
Definition: Range.cc:712
void change_start_row(const String &start_row)
Changes the start row.
bool escape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
uint64_t m_cells_scanned
Definition: Range.h:358
bool compaction(const void *key)
Test if compaction needs to be perfomed on object.
Lib::Master::ClientPtr m_master_client
Definition: Range.h:369
#define HT_ON_SCOPE_EXIT(...)
Definition: ScopeGuard.h:301
void acknowledge_load(uint32_t timeout_ms)
Definition: Range.cc:1708
size_t column_qualifier_len
Definition: KeySpec.h:129
String format_bytes(size_t n, const void *buf, size_t len, const char *trailer)
Return first n bytes of buffer with an optional trailer if the size of the buffer exceeds n...
Definition: String.cc:103
std::shared_ptr< EntityTask > EntityTaskPtr
const void * row
Definition: KeySpec.h:125
CommitLogPtr m_transfer_log
Definition: Range.h:383
static void add_to_work_queue(MetaLog::EntityTaskPtr entity)
Definition: Global.cc:84
TableIdentifier m_table
Definition: Range.h:374
static const int64_t TIMESTAMP_MIN
Definition: KeySpec.h:34
uint64_t m_bytes_returned
Definition: Range.h:363
int64_t m_maintenance_generation
Definition: Range.h:395
CharT * alloc(size_t sz)
Allocate sz bytes.
Definition: PageArena.h:216
std::shared_ptr< CommitLogReader > CommitLogReaderPtr
Smart pointer to CommitLogReader.
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
Definition: md5.cc:425
std::shared_ptr< Client > ClientPtr
Definition: Client.h:156
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Wrapper for TableIdentifier providing member storage.
Range(Lib::Master::ClientPtr &, const TableIdentifier &, SchemaPtr &, const RangeSpec &, RangeSet *, const RangeState &, bool needs_compaction=false)
Definition: Range.cc:70
Declarations for TransferLog This file contains type declarations for TransferLog, a class for creating a uniq range transfer log path.
void set_start_row(const std::string &s)
Definition: RangeSpec.h:112
int flags(const void *key)
Returns bit field for a give pointer.
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
Definition: TableMutator.h:257
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Definition: TableMutator.h:55
uint64_t m_cells_returned
Definition: Range.h:359
void add(const Key &key, const ByteString value)
This method must not fail.
Definition: Range.cc:445
File system utility functions.
void load_cell_stores()
Definition: Range.cc:263
const char * end_row
Definition: RangeSpec.h:60
virtual void reset_files_scan()=0
bool m_relinquish
Definition: Range.h:399
std::map< const char *, int64_t, LtCstr, SplitRowDataAlloc > SplitRowDataMapT
Definition: CellList.h:66
static int64_t number64(int64_t maximum=0)
Returns a random 64-bit unsigned integer.
Definition: Random.cc:63
void set(const std::vector< AccessGroup::Hints > &hints)
Replaces contents of internal hints vector.
void initialize()
Definition: Range.cc:93
static std::string toplevel_dir
Definition: Global.h:108
String get_name()
Definition: Range.h:296
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
uint64_t m_disk_bytes_read
Definition: Range.h:365
void compact(MaintenanceFlag::Map &subtask_map)
Definition: Range.cc:1432
std::shared_ptr< MergeScannerRange > MergeScannerRangePtr
Smart pointer to MergeScannerRange.
virtual bool get_next_files(String &ag_name, String &files, uint32_t *nextcsidp)=0
int m_compaction_type_needed
Definition: Range.h:394
static TablePtr rs_metrics_table
Definition: Global.h:92
void create_scanner(ScanContextPtr &scan_ctx, MergeScannerRangePtr &scanner)
Definition: Range.cc:480
int64_t m_split_threshold
Definition: Range.h:381
static MetaLog::WriterPtr rsml_writer
Definition: Global.h:81
bool exists()
Checks if hints file exists.
Stl compatible memory allocator based on a PageArena.
Definition: StlAllocator.h:41
STL Strict Weak Ordering for comparing c-style strings.
Definition: StringExt.h:45
void relinquish_finalize()
Definition: Range.cc:823
Compatibility Macros for C/C++.
Barrier m_scan_barrier
Definition: Range.h:385
std::ostream & operator<<(std::ostream &os, const crontab_entry &entry)
Helper function to write crontab_entry to an ostream.
Definition: Crontab.cc:301
bool m_dropped
Definition: Range.h:397
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
uint64_t m_added_deletes[KEYSPEC_DELETE_MAX]
Definition: Range.h:390
std::shared_ptr< AccessGroup > AccessGroupPtr
Definition: AccessGroup.h:303
#define HT_END
Definition: Logger.h:220
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
bool m_split_off_high
Definition: Range.h:386
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
AccessGroupVector m_access_group_vector
Definition: Range.h:376
LoadMetricsRange m_load_metrics
Definition: Range.h:396
#define HT_ERROR_OUT
Definition: Logger.h:301
Declarations for Range.
bool m_capacity_exceeded_throttle
Definition: Range.h:398
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
Provides sequential access to blocks in a commit log.
static std::string get_text(uint8_t state)
Returns string representation of range state value.
Definition: RangeState.cc:48
static int64_t range_split_size
Definition: Global.h:84
void split_notify_master()
Definition: Range.cc:1338
std::vector< AccessGroupPtr > m_column_family_vector
Definition: Range.h:377
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
std::shared_ptr< CellStore > CellStorePtr
Smart pointer to CellStore.
Definition: CellStore.h:340
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
bool m_is_metadata
Definition: Range.h:388
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
SchemaPtr m_schema
Definition: Range.h:372
void write(String location)
Write hints file.
Relinquish - range compacted.
Definition: RangeState.h:57
bool m_initialized
Definition: Range.h:400
std::shared_ptr< MetaLogEntityRange > MetaLogEntityRangePtr
Smart pointer to MetaLogEntityRange.
const char * start_row
Definition: RangeSpec.h:59
uint64_t m_bytes_scanned
Definition: Range.h:362
void update_schema(SchemaPtr &schema)
Definition: Range.cc:387
Split - range shrunk.
Definition: RangeState.h:55
virtual bool remove(const String &start_row, const String &end_row)=0
Removes the range associated with the given end_row.
std::mutex m_schema_mutex
Definition: Range.h:368
#define HT_INFOF(msg,...)
Definition: Logger.h:272
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
bool cancel_maintenance()
Definition: Range.cc:555
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
AccessGroup::MaintenanceData * agdata
Definition: Range.h:81
Random number generator for int32, int64, double and ascii arrays.
static int64_t range_metadata_split_size
Definition: Global.h:93
RangeSet * m_range_set
Definition: Range.h:392
static int32_t access_group_max_mem
Definition: Global.h:88
String m_split_row
Definition: Range.h:382
std::mutex m_mutex
Definition: Range.h:367
This is a generic exception class for Hypertable.
Definition: Error.h:314
static std::mutex mutex
Definition: Global.h:62
void purge_memory(MaintenanceFlag::Map &subtask_map)
Definition: Range.cc:1524
static const char * END_ROOT_ROW
Definition: Key.h:50
#define HT_MAYBE_FAIL_X(_label_, _exp_)
bool next(const uint8_t **blockp, size_t *lenp, BlockHeaderCommitLog *)
static Hypertable::FilesystemPtr log_dfs
Definition: Global.h:65
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Declarations for CommitLog.
CellListScanner * create_scanner_pseudo_table(ScanContextPtr &scan_ctx, const String &table_name)
Creates a scanner over the pseudo-table indicated by table_name.
Definition: Range.cc:505
bool m_unsplittable
Definition: Range.h:389
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
void set_end_row(const std::string &e)
Definition: RangeSpec.h:116
Range state.
Definition: RangeState.h:48
String m_name
Definition: Range.h:373
Configuration settings.
RangeMaintenanceGuard m_maintenance_guard
Definition: Range.h:378
#define HT_MAYBE_FAIL(_label_)
int32_t m_error
Definition: Range.h:393
void compute_and_store(TableMutator *mutator, time_t now, LoadFactors &load_factors, uint64_t disk_used, uint64_t memory_used, double compression_ratio)
Value format for version 1:
static CellStorePtr open(const String &name, const char *start_row, const char *end_row)
Creates a CellStore object from a given cell store file.
std::vector< AccessGroup::Hints > & get()
Returns reference to internal hints vector.
void relinquish_compact()
Definition: Range.cc:772
String extensions and helpers: sets, maps, append operators etc.
const char * column_family
Definition: KeySpec.h:127
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
void replay_transfer_log(CommitLogReader *commit_log_reader)
Called before range has been flipped live so no locking needed.
Definition: Range.cc:1641
Creates a unique range transfer log pathname.
Definition: TransferLog.h:46
void relinquish()
Definition: Range.cc:660
md5 digest routines.
Wrapper for RangeSpec providing member storage.
Definition: RangeSpec.h:89
#define HT_FATAL_OUT
Definition: Logger.h:347
static bool row_size_unlimited
Definition: Global.h:75
#define HT_DEBUG_OUT
Definition: Logger.h:261
Declarations for MergeScannerRange.
static int64_t range_maximum_size
Definition: Global.h:85
int code() const
Returns the error code.
Definition: Error.h:391
static bool immovable_range_set_contains(const TableIdentifier &table, const RangeSpec &spec)
Definition: Global.cc:106
std::shared_ptr< ScanContext > ScanContextPtr
Definition: ScanContext.h:169
bool memory_purge(const void *key)
Test if memory purge needs to be perfomed on object.
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484
Executes user-defined functions when leaving the current scope.
A helper class to put up a barrier when entering a scope and take it down when leaving the scope...
Definition: Barrier.h:93
void recovery_finalize()
This method is called when the range is offline so no locking is needed.
Definition: Range.cc:1574
void split()
Definition: Range.cc:896