0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
AccessGroup.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 #include "AccessGroup.h"
29 
41 
42 #include <Common/DynamicBuffer.h>
43 #include <Common/Error.h>
44 #include <Common/FailureInducer.h>
45 #include <Common/md5.h>
46 
47 #include <algorithm>
48 #include <cassert>
49 #include <cstdlib>
50 #include <cstring>
51 #include <iterator>
52 #include <vector>
53 
54 using namespace Hypertable;
55 using namespace std;
56 
58  SchemaPtr &schema, AccessGroupSpec *ag_spec,
59  const RangeSpec *range, const Hints *hints)
60  : m_identifier(*identifier), m_schema(schema), m_name(ag_spec->get_name()),
61  m_cell_cache_manager {make_shared<CellCacheManager>()},
62  m_file_tracker(identifier, schema, range, ag_spec->get_name()),
64 
66  m_start_row = range->start_row;
67  m_end_row = range->end_row;
68  m_range_name = m_table_name + "[" + m_start_row + ".." + m_end_row + "]";
69  m_full_name = m_range_name + "(" + m_name + ")";
70 
72 
73  for (auto cf_spec : ag_spec->columns())
74  m_column_families.insert(cf_spec->get_id());
75 
76  m_is_root = (m_identifier.is_metadata() && *range->start_row == 0
77  && !strcmp(range->end_row, Key::END_ROOT_ROW));
78  m_in_memory = ag_spec->get_option_in_memory();
79 
80  update_schema(schema, ag_spec);
81 
82  // Restore state from hints
83  if (hints) {
86  m_disk_usage = hints->disk_usage;
87  }
88 }
89 
90 
99  AccessGroupSpec *ag_spec) {
100  lock_guard<mutex> lock(m_schema_mutex);
101  set<uint8_t>::iterator iter;
102 
103  if (!m_cellstore_props ||
104  schema->get_generation() > m_schema->get_generation()) {
105 
107 
108  m_cellstore_props = make_shared<Properties>();
109  m_cellstore_props->set("compressor", ag_spec->get_option_compressor());
110  m_cellstore_props->set("blocksize", ag_spec->get_option_blocksize());
111  if (ag_spec->get_option_replication() != -1)
112  m_cellstore_props->set("replication",
113  (int32_t)ag_spec->get_option_replication());
114 
115  if (!ag_spec->get_option_bloom_filter().empty())
118  else {
119  assert(Config::properties); // requires Config::init* first
121  Config::get_str("Hypertable.RangeServer.CellStore.DefaultBloomFilter"),
123  }
124 
125  for (auto cf_spec : ag_spec->columns()) {
126  iter = m_column_families.find(cf_spec->get_id());
127  if (iter == m_column_families.end()) {
128  // Add new column families
129  if (!cf_spec->get_deleted())
130  m_column_families.insert(cf_spec->get_id());
131  }
132  else {
133  // Delete existing cfs
134  if (cf_spec->get_deleted())
135  m_column_families.erase(iter);
136 
137  // TODO: In future other types of updates
138  // such as alter table modify etc will go in here
139  }
140  }
141 
142  // Update schema ptr
143  lock_guard<mutex> lock(m_mutex);
144  m_schema = schema;
145  }
146 }
147 
153 void AccessGroup::add(const Key &key, const ByteString value) {
154 
155  assert(m_start_row.compare(key.row) < 0 && m_end_row.compare(key.row) >= 0);
156 
157  if (!m_dirty)
158  m_dirty = true;
159 
163  if (m_schema->column_is_counter(key.column_family_code))
164  return m_cell_cache_manager->add_counter(key, value);
165  else
166  return m_cell_cache_manager->add(key, value);
167  }
168  else if (!m_recovering) {
170  HT_ERRORF("Revision (clock) skew detected! Key '%s' revision=%lld, latest_stored=%lld",
172  if (m_schema->column_is_counter(key.column_family_code))
173  return m_cell_cache_manager->add_counter(key, value);
174  else
175  return m_cell_cache_manager->add(key, value);
176  }
177  }
178  else if (m_in_memory) {
179  if (m_schema->column_is_counter(key.column_family_code))
180  return m_cell_cache_manager->add_counter(key, value);
181  else
182  return m_cell_cache_manager->add(key, value);
183  }
184 }
185 
186 
188  uint32_t flags = (scan_ctx->spec && scan_ctx->spec->return_deletes) ?
190  MergeScannerAccessGroup *scanner =
193 
194  CellStoreReleaseCallback callback(this);
195 
196  {
197  lock_guard<mutex> lock(m_outstanding_scanner_mutex);
199  }
200 
201  try {
202  lock_guard<mutex> lock(m_mutex);
203  uint64_t initial_bytes_read;
204 
205  m_cell_cache_manager->add_scanners(scanner, scan_ctx);
206 
207  if (!m_in_memory) {
208  bool bloom_filter_disabled;
209 
210  for (size_t i=0; i<m_stores.size(); ++i) {
211 
212  if (scan_ctx->time_interval.first > m_stores[i].timestamp_max ||
213  scan_ctx->time_interval.second < m_stores[i].timestamp_min)
214  continue;
215 
216  bloom_filter_disabled = boost::any_cast<uint8_t>(m_stores[i].cs->get_trailer()->get("bloom_filter_mode")) == BLOOM_FILTER_DISABLED;
217 
218  initial_bytes_read = m_stores[i].cs->bytes_read();
219 
220  // Query bloomfilter only if it is enabled and a start row has been specified
221  // (ie query is not something like select bar from foo;)
222  if (bloom_filter_disabled ||
223  !scan_ctx->single_row ||
224  scan_ctx->start_row == "") {
225  if (m_stores[i].shadow_cache) {
226  scanner->add_scanner(m_stores[i].shadow_cache->create_scanner(scan_ctx));
227  m_stores[i].shadow_cache_hits++;
228  }
229  else
230  scanner->add_scanner(m_stores[i].cs->create_scanner(scan_ctx));
231  callback.add_file(m_stores[i].cs->get_filename());
232  }
233  else {
234  m_stores[i].bloom_filter_accesses++;
235  if (m_stores[i].cs->may_contain(scan_ctx)) {
236  m_stores[i].bloom_filter_maybes++;
237  if (m_stores[i].shadow_cache) {
238  scanner->add_scanner(m_stores[i].shadow_cache->create_scanner(scan_ctx));
239  m_stores[i].shadow_cache_hits++;
240  }
241  else
242  scanner->add_scanner(m_stores[i].cs->create_scanner(scan_ctx));
243  callback.add_file(m_stores[i].cs->get_filename());
244  }
245  }
246 
247  if (m_stores[i].cs->bytes_read() > initial_bytes_read)
248  scanner->add_disk_read(m_stores[i].cs->bytes_read() - initial_bytes_read);
249 
250  }
251  }
252  }
253  catch (Exception &e) {
254  lock_guard<mutex> lock(m_outstanding_scanner_mutex);
255  if (--m_outstanding_scanner_count == 0)
256  m_outstanding_scanner_cond.notify_all();
257  delete scanner;
258  HT_THROW2F(e.code(), e, "Problem creating scanner on access group %s",
259  m_full_name.c_str());
260  }
261 
263  scanner->install_release_callback(callback);
264 
265  return scanner;
266 }
267 
269  lock_guard<mutex> lock(m_schema_mutex);
270  for (set<uint8_t>::iterator iter = m_column_families.begin();
271  iter != m_column_families.end(); ++iter) {
272  if (scan_ctx->family_mask[*iter])
273  return true;
274  }
275  return false;
276 }
277 
278 
280  lock_guard<mutex> lock(m_mutex);
281  m_cell_cache_manager->split_row_estimate_data(split_row_data);
282 }
283 
284 
286  lock_guard<mutex> lock(m_mutex);
287  if (!m_in_memory) {
288  for (auto &csinfo : m_stores)
289  csinfo.cs->split_row_estimate_data(split_row_data);
290  }
291 }
292 
294  lock_guard<mutex> lock(m_mutex);
295  for (auto &csinfo : m_stores)
296  csinfo.cs->populate_index_pseudo_table_scanner(scanner);
297 }
298 
299 
301  lock_guard<mutex> lock(m_mutex);
302  uint64_t du = (m_in_memory) ? 0 : m_disk_usage;
303  uint64_t mu = m_cell_cache_manager->memory_used();
304  return du + (uint64_t)(m_compression_ratio * (float)mu);
305 }
306 
308  lock_guard<mutex> lock(m_mutex);
309  return m_cell_cache_manager->memory_used();
310 }
311 
312 void AccessGroup::space_usage(int64_t *memp, int64_t *diskp) {
313  lock_guard<mutex> lock(m_mutex);
314  *memp = m_cell_cache_manager->memory_used();
315  *diskp = (m_in_memory) ? 0 : m_disk_usage;
316  *diskp += (int64_t)(m_compression_ratio * (float)*memp);
317 }
318 
319 
321  lock_guard<mutex> lock(m_mutex);
322  uint64_t memory_purged = 0;
323  int flags;
324 
325  {
326  lock_guard<mutex> lock(m_outstanding_scanner_mutex);
327  for (size_t i=0; i<m_stores.size(); i++) {
328  flags = subtask_map.flags(m_stores[i].cs.get());
330  m_stores[i].shadow_cache) {
331  memory_purged += m_stores[i].shadow_cache->memory_allocated();
332  m_stores[i].shadow_cache = 0;
333  }
334  if (m_outstanding_scanner_count == 0 &&
336  memory_purged += m_stores[i].cs->purge_indexes();
337  }
338  }
339 
340  return memory_purged;
341 }
342 
344 AccessGroup::get_maintenance_data(ByteArena &arena, time_t now, int flags) {
345  lock_guard<mutex> lock(m_mutex);
346  MaintenanceData *mdata = (MaintenanceData *)arena.alloc(sizeof(MaintenanceData));
347 
348  memset(mdata, 0, sizeof(MaintenanceData));
349  mdata->ag = this;
350 
353 
356  else
358 
360 
361  CellCache::Statistics cache_stats;
362  m_cell_cache_manager->get_cache_statistics(cache_stats);
363  mdata->cell_count = cache_stats.size;
364  mdata->key_bytes = cache_stats.key_bytes;
365  mdata->value_bytes = cache_stats.value_bytes;
366  mdata->mem_allocated = cache_stats.memory_allocated;
367  mdata->mem_used = cache_stats.memory_used;
368  mdata->deletes = cache_stats.deletes;
369 
371 
372  mdata->disk_used = m_disk_usage;
373  int64_t du = m_in_memory ? 0 : m_disk_usage;
374  mdata->disk_estimate = du + (int64_t)(m_compression_ratio * (float)mdata->mem_used);
376  mdata->in_memory = m_in_memory;
377 
378  CellStoreMaintenanceData **tailp = 0;
379  mdata->csdata = 0;
380  for (size_t i=0; i<m_stores.size(); i++) {
381  if (mdata->csdata == 0) {
383  mdata->csdata->cs = m_stores[i].cs.get();
384  tailp = &mdata->csdata;
385  }
386  else {
387  (*tailp)->next = (CellStoreMaintenanceData *)arena.alloc(sizeof(CellStoreMaintenanceData));
388  (*tailp)->next->cs = m_stores[i].cs.get();
389  tailp = &(*tailp)->next;
390  }
391  m_stores[i].cs->get_index_memory_stats( &(*tailp)->index_stats );
392  mdata->block_index_memory += ((*tailp)->index_stats).block_index_memory;
393  mdata->bloom_filter_memory += ((*tailp)->index_stats).bloom_filter_memory;
394 
395  mdata->bloom_filter_accesses += m_stores[i].bloom_filter_accesses;
396  mdata->bloom_filter_maybes += m_stores[i].bloom_filter_maybes;
397  mdata->bloom_filter_fps += m_stores[i].bloom_filter_fps;
398  if (!m_in_memory) {
399  mdata->cell_count += m_stores[i].cell_count;
400  mdata->key_bytes += m_stores[i].key_bytes;
401  mdata->value_bytes += m_stores[i].value_bytes;
402  }
403 
404  if (m_stores[i].shadow_cache) {
405  (*tailp)->shadow_cache_size = m_stores[i].shadow_cache->memory_allocated();
406  (*tailp)->shadow_cache_ecr = m_stores[i].shadow_cache_ecr;
407  (*tailp)->shadow_cache_hits = m_stores[i].shadow_cache_hits;
408  }
409  else {
410  (*tailp)->shadow_cache_size = 0;
411  (*tailp)->shadow_cache_ecr = TIMESTAMP_MAX;
412  (*tailp)->shadow_cache_hits = 0;
413  }
414  (*tailp)->maintenance_flags = 0;
415  (*tailp)->next = 0;
416 
417  mdata->shadow_cache_memory += (*tailp)->shadow_cache_size;
418  }
419  mdata->file_count = m_stores.size();
420 
421  // If immutable cache installed, compaction in progress
422  if (m_cell_cache_manager->immutable_cache())
423  mdata->gc_needed = false;
424  else
426 
428  mdata->end_merge = m_end_merge;
429 
430  mdata->maintenance_flags = 0;
431 
432  return mdata;
433 }
434 
435 
437 
438  // Record the latest stored revision
439  int64_t revision = boost::any_cast<int64_t>
440  (cellstore->get_trailer()->get("revision"));
441  if (revision > m_latest_stored_revision)
442  m_latest_stored_revision = revision;
443 
444  if (m_in_memory) {
445  HT_ASSERT(m_stores.empty());
446  ScanContextPtr scan_ctx = make_shared<ScanContext>(m_schema);
447  CellListScannerPtr scanner = cellstore->create_scanner(scan_ctx.get());
448  m_cell_cache_manager->add(scanner);
449  }
450 
451  m_stores.push_back(cellstore);
452 
453  int64_t total_index_entries = 0;
454  recompute_compression_ratio(&total_index_entries);
455  m_file_tracker.add_live_noupdate(cellstore->get_filename(), total_index_entries);
456 }
457 
458 void AccessGroup::measure_garbage(double *total, double *garbage) {
459  ScanContextPtr scan_ctx = make_shared<ScanContext>(m_schema);
461  = make_shared<MergeScannerAccessGroup>(m_table_name, scan_ctx.get());
462  ByteString value;
463  Key key;
464 
465  CellListScannerPtr immutable_scanner =
466  m_cell_cache_manager->create_immutable_scanner(scan_ctx.get());
467 
468  if (immutable_scanner)
469  mscanner->add_scanner(immutable_scanner);
470 
471  if (!m_in_memory) {
472  for (size_t i=0; i<m_stores.size(); i++) {
473  HT_ASSERT(m_stores[i].cs);
474  mscanner->add_scanner(m_stores[i].cs->create_scanner(scan_ctx.get()));
475  }
476  }
477 
478  while (mscanner->get(key, value))
479  mscanner->forward();
480 
481  *total = (double)mscanner->get_input_bytes();
482  *garbage = *total - (double)mscanner->get_output_bytes();
483 }
484 
485 
486 
487 void AccessGroup::run_compaction(int maintenance_flags, Hints *hints) {
488  ByteString bskey;
489  ByteString value;
490  Key key;
491  CellStorePtr cellstore;
492  CellCachePtr filtered_cache, shadow_cache;
493  String metadata_key_str;
494  bool abort_loop = true;
495  bool minor = false;
496  bool merging = false;
497  bool major = false;
498  bool gc = false;
499  bool cellstore_created = false;
500  size_t merge_offset=0, merge_length=0;
501  String added_file;
502 
503  hints->ag_name = m_name;
505 
506  while (abort_loop) {
507  lock_guard<mutex> lock(m_mutex);
508  if (m_in_memory) {
510  break;
511  HT_INFOF("Starting InMemory Compaction of %s", m_full_name.c_str());
512  Global::load_statistics->increment_compactions_minor();
513  }
514  else if (MaintenanceFlag::major_compaction(maintenance_flags) ||
515  MaintenanceFlag::move_compaction(maintenance_flags)) {
516  if ((m_cell_cache_manager->immutable_cache_empty()) &&
517  m_stores.size() <= (size_t)1 &&
518  (!MaintenanceFlag::split(maintenance_flags) &&
519  !MaintenanceFlag::move_compaction(maintenance_flags) &&
520  !MaintenanceFlag::gc_compaction(maintenance_flags)))
521  break;
522  major = true;
523  HT_INFOF("Starting Major Compaction of %s", m_full_name.c_str());
524  Global::load_statistics->increment_compactions_major();
525  }
526  else {
527  if (MaintenanceFlag::merging_compaction(maintenance_flags)) {
528  m_needs_merging = find_merge_run(&merge_offset, &merge_length);
529  if (!m_needs_merging)
530  break;
531  m_end_merge = (merge_offset + merge_length) == m_stores.size();
532  HT_INFOF("Starting Merging Compaction of %s (end_merge=%s)",
533  m_full_name.c_str(), m_end_merge ? "true" : "false");
534  Global::load_statistics->increment_compactions_merging();
535  if (merge_length == m_stores.size())
536  major = true;
537  else {
538  merging = true;
539  if (!m_end_merge)
540  merge_caches();
541  }
542  }
543  else if (MaintenanceFlag::gc_compaction(maintenance_flags)) {
544  gc = true;
545  HT_INFOF("Starting GC Compaction of %s", m_full_name.c_str());
546  Global::load_statistics->increment_compactions_gc();
547  }
548  else {
549  if (m_cell_cache_manager->immutable_cache_empty())
550  break;
551  minor = true;
552  HT_INFOF("Starting Minor Compaction of %s", m_full_name.c_str());
553  Global::load_statistics->increment_compactions_minor();
554  }
555  }
556  abort_loop = false;
557  }
558 
559  if (abort_loop) {
560  lock_guard<mutex> lock(m_mutex);
561  merge_caches();
563  hints->disk_usage = m_disk_usage;
564  return;
565  }
566 
567  String cs_file;
568  PropertiesPtr cellstore_props;
569  {
570  lock_guard<mutex> lock(m_schema_mutex);
571  cellstore_props = m_cellstore_props;
572  }
573 
574  try {
575  time_t now = time(0);
576  int64_t max_num_entries {};
577  CellListScannerPtr scanner;
579  ScanContextPtr scan_ctx;
580 
581  {
582  lock_guard<mutex> lock(m_mutex);
583  scan_ctx = make_shared<ScanContext>(m_schema);
584 
585  cs_file = format("%s/tables/%s/%s/%s/cs%d",
586  Global::toplevel_dir.c_str(),
587  m_identifier.id, m_name.c_str(),
588  m_range_dir.c_str(),
589  m_next_cs_id++);
590 
596  if (gc || (minor && m_garbage_tracker.check_needed(now))) {
597  double total, garbage;
598  measure_garbage(&total, &garbage);
599  m_garbage_tracker.adjust_targets(now, total, garbage);
600  if (m_garbage_tracker.collection_needed(total, garbage)) {
601  if (minor || merging)
602  HT_INFOF("Switching to major compaction to collect %.2f%% garbage",
603  (garbage/total)*100.00);
604  major = true;
605  minor = false;
606  merging = false;
607  }
608  else if (gc) {
609  HT_INFOF("Aborting GC compaction because measured garbage of %.2f%% "
610  "is below threshold", (garbage/total)*100.00);
611  merge_caches();
613  hints->disk_usage = m_disk_usage;
614  return;
615  }
616  }
617 
618  cellstore = make_shared<CellStoreV7>(Global::dfs.get(), m_schema);
619 
620  max_num_entries = m_cell_cache_manager->immutable_items();
621 
622  if (m_in_memory) {
623  mscanner = make_shared<MergeScannerAccessGroup>(m_table_name, scan_ctx.get(),
626  m_cell_cache_manager->add_immutable_scanner(mscanner.get(), scan_ctx.get());
627  filtered_cache = make_shared<CellCache>();
628  }
629  else if (merging) {
630  mscanner = make_shared<MergeScannerAccessGroup>(m_table_name, scan_ctx.get(),
633  // If we're merging up to the end of the vector of stores, add in the cell cache
634  if (m_end_merge) {
635  HT_ASSERT((merge_offset + merge_length) == m_stores.size());
636  m_cell_cache_manager->add_immutable_scanner(mscanner.get(), scan_ctx.get());
637  }
638  else
639  max_num_entries = 0;
640  for (size_t i=merge_offset; i<merge_offset+merge_length; i++) {
641  HT_ASSERT(m_stores[i].cs);
642  mscanner->add_scanner(m_stores[i].cs->create_scanner(scan_ctx.get()));
643  int divisor = (boost::any_cast<uint32_t>(m_stores[i].cs->get_trailer()->get("flags")) & CellStoreTrailerV7::SPLIT) ? 2: 1;
644  max_num_entries += (boost::any_cast<int64_t>
645  (m_stores[i].cs->get_trailer()->get("total_entries")))/divisor;
646  }
647  }
648  else if (major) {
649  mscanner = make_shared<MergeScannerAccessGroup>(m_table_name, scan_ctx.get(),
652  m_cell_cache_manager->add_immutable_scanner(mscanner.get(), scan_ctx.get());
653  for (size_t i=0; i<m_stores.size(); i++) {
654  HT_ASSERT(m_stores[i].cs);
655  mscanner->add_scanner(m_stores[i].cs->create_scanner(scan_ctx.get()));
656  int divisor = (boost::any_cast<uint32_t>(m_stores[i].cs->get_trailer()->get("flags")) & CellStoreTrailerV7::SPLIT) ? 2: 1;
657  max_num_entries += (boost::any_cast<int64_t>
658  (m_stores[i].cs->get_trailer()->get("total_entries")))/divisor;
659  }
660  }
661  else {
662  scanner = m_cell_cache_manager->create_immutable_scanner(scan_ctx.get());
663  HT_ASSERT(scanner);
664  }
665  }
666 
667  cellstore->create(cs_file.c_str(), max_num_entries, cellstore_props, &m_identifier);
668 
669  if (mscanner) {
670  while (mscanner->get(key, value)) {
671  cellstore->add(key, value);
672  if (m_in_memory)
673  filtered_cache->add(key, value);
674  mscanner->forward();
675  }
676  m_garbage_tracker.adjust_targets(now, mscanner.get());
677  }
678  else {
679  while (scanner->get(key, value)) {
680  cellstore->add(key, value);
681  if (m_in_memory)
682  filtered_cache->add(key, value);
683  scanner->forward();
684  }
685  }
686 
687  CellStoreTrailerV7 *trailer = dynamic_cast<CellStoreTrailerV7 *>(cellstore->get_trailer());
688 
689  if (major)
690  HT_ASSERT(mscanner);
691 
692  if (major)
694 
695  if (maintenance_flags & MaintenanceFlag::SPLIT)
696  trailer->flags |= CellStoreTrailerV7::SPLIT;
697 
698  cellstore->finalize(&m_identifier);
699 
700  if (FailureInducer::enabled()) {
701  if (MaintenanceFlag::split(maintenance_flags))
702  FailureInducer::instance->maybe_fail("compact-split-1");
703  if (MaintenanceFlag::relinquish(maintenance_flags))
704  FailureInducer::instance->maybe_fail("compact-relinquish-1");
705  if (!MaintenanceFlag::split(maintenance_flags) &&
706  !MaintenanceFlag::relinquish(maintenance_flags))
707  FailureInducer::instance->maybe_fail("compact-manual-1");
708  }
709 
710  cellstore_created = true;
711 
715  vector<String> removed_files;
716  int64_t total_index_entries = 0;
717  {
718  lock_guard<mutex> lock(m_mutex);
719 
720  if (merging) {
721  vector<CellStoreInfo> new_stores;
722  new_stores.reserve(m_stores.size() - (merge_length-1));
723  for (size_t i=0; i<merge_offset; i++)
724  new_stores.push_back(m_stores[i]);
725  for (size_t i=merge_offset; i<merge_offset+merge_length; i++)
726  removed_files.push_back(m_stores[i].cs->get_filename());
727  if (cellstore->get_total_entries() > 0) {
728  new_stores.push_back(cellstore);
729  added_file = cellstore->get_filename();
730  }
731  for (size_t i=merge_offset+merge_length; i<m_stores.size(); i++)
732  new_stores.push_back(m_stores[i]);
733  m_stores.swap(new_stores);
734 
735  // If cell cache was included in the merge, drop it
736  if (m_end_merge)
737  m_cell_cache_manager->drop_immutable_cache();
738 
739  }
740  else {
741 
742  if (m_in_memory) {
743  m_cell_cache_manager->install_new_immutable_cache(filtered_cache);
744  m_cell_cache_manager->merge_caches(m_schema);
745  for (size_t i=0; i<m_stores.size(); i++)
746  removed_files.push_back(m_stores[i].cs->get_filename());
747  m_stores.clear();
748  }
749  else {
750 
751  if (minor && Global::enable_shadow_cache &&
752  !MaintenanceFlag::purge_shadow_cache(maintenance_flags))
753  shadow_cache = m_cell_cache_manager->immutable_cache();
754 
755  m_cell_cache_manager->drop_immutable_cache();
756 
758  if (major) {
759  for (size_t i=0; i<m_stores.size(); i++)
760  removed_files.push_back(m_stores[i].cs->get_filename());
761  m_stores.clear();
762  }
763  }
764 
768  if (cellstore->get_total_entries() > 0) {
769  if (shadow_cache)
770  m_stores.push_back( CellStoreInfo(cellstore, shadow_cache, m_earliest_cached_revision_saved) );
771  else
772  m_stores.push_back(cellstore);
773  added_file = cellstore->get_filename();
774  }
775  }
776 
778 
779  // If compaction included CellCache, recompute latest stored revision
780  if (!merging || m_end_merge) {
781  m_latest_stored_revision = boost::any_cast<int64_t>
782  (cellstore->get_trailer()->get("revision"));
784  HT_ERROR("Revision (clock) skew detected! May result in data loss.");
786  }
787 
789  recompute_compression_ratio(&total_index_entries);
791  hints->disk_usage = m_disk_usage;
792  }
793 
794  if (cellstore->get_total_entries() == 0) {
795  String fname = cellstore->get_filename();
796  cellstore = 0;
797  try {
798  Global::dfs->remove(fname);
799  }
800  catch (Hypertable::Exception &e) {
801  HT_WARN_OUT << "Problem removing empty CellStore '" << fname << "' " << e << HT_END;
802  }
803  }
804 
805  m_file_tracker.update_live(added_file, removed_files, m_next_cs_id, total_index_entries);
808 
809  {
810  lock_guard<mutex> lock(m_mutex);
812  }
813 
814  if (FailureInducer::enabled()) {
815  if (MaintenanceFlag::split(maintenance_flags))
816  FailureInducer::instance->maybe_fail("compact-split-2");
817  if (MaintenanceFlag::relinquish(maintenance_flags))
818  FailureInducer::instance->maybe_fail("compact-relinquish-2");
819  if (!MaintenanceFlag::split(maintenance_flags) &&
820  !MaintenanceFlag::relinquish(maintenance_flags))
821  FailureInducer::instance->maybe_fail("compact-manual-2");
822  }
823 
824  HT_INFOF("Finished Compaction of %s(%s) to %s", m_range_name.c_str(),
825  m_name.c_str(), added_file.c_str());
826 
827  }
828  catch (Exception &e) {
829  // Remove newly created file
830  if (!cellstore_created) {
831  if (!cs_file.empty()) {
832  try {
833  Global::dfs->remove(cs_file);
834  }
835  catch (Hypertable::Exception &e) {
836  }
837  }
838  HT_ERROR_OUT << m_full_name << " " << e << HT_END;
839  throw;
840  }
841  HT_FATALF("Problem compacting access group %s: %s - %s",
842  m_full_name.c_str(), Error::get_text(e.code()), e.what());
843  }
844 }
845 
847  hints->ag_name = m_name;
849  lock_guard<mutex> lock(m_mutex);
851  hints->disk_usage = m_disk_usage;
852 }
853 
855  String str = format("%s cellstores={", m_full_name.c_str());
856  lock_guard<mutex> lock(m_mutex);
857  for (size_t i=0; i<m_stores.size(); i++) {
858  if (i>0)
859  str += ";";
860  str += m_stores[i].cs->get_filename();
861  }
862  str += "}";
863  return str;
864 }
865 
866 
868  lock_guard<mutex> lock(m_mutex);
869  ScanContextPtr scan_ctx = make_shared<ScanContext>(m_schema);
870  Key key;
871  ByteString value;
872 
875 
876  CellCachePtr old_cell_cache = m_cell_cache_manager->active_cache();
877  m_cell_cache_manager->install_new_active_cache(make_shared<CellCache>());
878 
879  lock_guard<CellCacheManager> ccm_lock(*m_cell_cache_manager);
880 
881  CellListScannerPtr old_scanner = old_cell_cache->create_scanner(scan_ctx.get());
882 
883  m_recovering = true;
884  while (old_scanner->get(key, value)) {
885  if (key.revision > m_latest_stored_revision)
886  add(key, value);
887  old_scanner->forward();
888  }
889  m_recovering = false;
890 
892 }
893 
894 
898 void
899 AccessGroup::shrink(String &split_row, bool drop_high, Hints *hints) {
900  lock_guard<mutex> lock(m_mutex);
901  ScanContextPtr scan_ctx = make_shared<ScanContext>(m_schema);
902  ByteString key;
903  ByteString value;
904  Key key_comps;
905  CellStorePtr new_cell_store;
906  int cmp;
907 
908  hints->ag_name = m_name;
910 
911  CellCachePtr old_cell_cache = m_cell_cache_manager->active_cache();
912 
913  m_recovering = true;
914 
917 
918  try {
919 
920  if (drop_high) {
921  m_end_row = split_row;
923  }
924  else
925  m_start_row = split_row;
926 
927  m_range_name = m_table_name + "[" + m_start_row + ".." + m_end_row + "]";
928  m_full_name = m_range_name + "(" + m_name + ")";
929 
931 
932  m_cell_cache_manager->install_new_active_cache(make_shared<CellCache>());
933  {
934  lock_guard<CellCacheManager> ccm_lock(*m_cell_cache_manager);
935 
936  CellListScannerPtr old_scanner = old_cell_cache->create_scanner(scan_ctx.get());
937 
941  while (old_scanner->get(key_comps, value)) {
942 
943  cmp = strcmp(key_comps.row, split_row.c_str());
944 
945  if ((cmp > 0 && !drop_high) || (cmp <= 0 && drop_high)) {
946  /*
947  * For IN_MEMORY access groups, record earliest cached
948  * revision that is > latest_stored. For normal access groups,
949  * record absolute earliest cached revision
950  */
951  if (m_in_memory) {
952  if (key_comps.revision > m_latest_stored_revision &&
955  }
956  else if (key_comps.revision < m_earliest_cached_revision)
958  add(key_comps, value);
959  }
960  old_scanner->forward();
961  }
962  }
963 
964  bool cellstores_shrunk = false;
965  {
966  lock_guard<mutex> lock(m_outstanding_scanner_mutex);
967  // Shrink without having to re-create CellStores
968  if (m_outstanding_scanner_count == 0) {
969  for (size_t i=0; i<m_stores.size(); i++)
970  m_stores[i].cs->rescope(m_start_row, m_end_row);
971  cellstores_shrunk = true;
972  }
973  }
974  // If we didn't shrink using the method above, do it the expensive way
975  if (!cellstores_shrunk) {
976  vector<CellStoreInfo> new_stores;
977  for (size_t i=0; i<m_stores.size(); i++) {
978  String filename = m_stores[i].cs->get_filename();
979  new_cell_store = CellStoreFactory::open(filename, m_start_row.c_str(),
980  m_end_row.c_str());
981  new_stores.push_back( new_cell_store );
982  }
983  m_stores = new_stores;
984  }
985 
986  // This recomputes m_disk_usage as well
989  hints->disk_usage = m_disk_usage;
991 
993 
994  m_recovering = false;
995  }
996  catch (Exception &e) {
997  m_recovering = false;
998  m_cell_cache_manager->install_new_active_cache(old_cell_cache);
1001  throw;
1002  }
1003 }
1004 
1005 
1006 
1009 void AccessGroup::release_files(const vector<String> &files) {
1010  {
1011  lock_guard<mutex> lock(m_outstanding_scanner_mutex);
1013  if (--m_outstanding_scanner_count == 0)
1014  m_outstanding_scanner_cond.notify_all();
1015  }
1018 }
1019 
1020 
1022  lock_guard<mutex> lock(m_mutex);
1023  HT_ASSERT(m_cell_cache_manager->immutable_cache_empty());
1024  if (m_cell_cache_manager->empty())
1025  return;
1026  m_cell_cache_manager->freeze();
1027  if (m_dirty) {
1029  m_dirty = false;
1030  }
1033 }
1034 
1035 
1037  lock_guard<mutex> lock(m_mutex);
1038  merge_caches();
1039 }
1040 
1041 
1048  m_dirty = true;
1049  }
1053  }
1054  m_cell_cache_manager->merge_caches(m_schema);
1055 }
1056 
1057 extern "C" {
1058 #include <unistd.h>
1059 }
1060 
1062  char hash_str[33];
1063  if (m_end_row == "")
1064  memset(hash_str, '0', 16);
1065  else
1066  md5_trunc_modified_base64(m_end_row.c_str(), hash_str);
1067  hash_str[16] = 0;
1068  m_range_dir = hash_str;
1069 
1070  String abs_range_dir = format("%s/tables/%s/%s/%s",
1071  Global::toplevel_dir.c_str(),
1072  m_identifier.id, m_name.c_str(),
1073  m_range_dir.c_str());
1074 
1075  m_next_cs_id = 0;
1076 
1077  try {
1078  if (!Global::dfs->exists(abs_range_dir))
1079  Global::dfs->mkdirs(abs_range_dir);
1080  else {
1081  uint32_t id;
1082  vector<Filesystem::Dirent> listing;
1083  Global::dfs->readdir(abs_range_dir, listing);
1084  for (size_t i=0; i<listing.size(); i++) {
1085  const char *fname = listing[i].name.c_str();
1086  if (!strncmp(fname, "cs", 2)) {
1087  id = atoi(&fname[2]);
1088  if (id >= m_next_cs_id)
1089  m_next_cs_id = id+1;
1090  }
1091  }
1092  }
1093  }
1094  catch (Exception &e) {
1095  HT_FATAL_OUT << e << HT_END;
1096  }
1097 
1104 }
1105 
1106 void AccessGroup::recompute_compression_ratio(int64_t *total_index_entriesp) {
1107  m_disk_usage = 0;
1108  m_compression_ratio = 0.0;
1109  if (total_index_entriesp)
1110  *total_index_entriesp = 0;
1111  for (size_t i=0; i<m_stores.size(); i++) {
1112  HT_ASSERT(m_stores[i].cs);
1113  if (total_index_entriesp)
1114  *total_index_entriesp += (int64_t)m_stores[i].cs->block_count();
1115  double disk_usage = m_stores[i].cs->disk_usage();
1116  m_disk_usage += (uint64_t)disk_usage;
1117  m_compression_ratio += disk_usage / m_stores[i].cs->compression_ratio();
1118  }
1119  if (m_disk_usage != 0)
1121  else
1122  m_compression_ratio = 1.0;
1123 }
1124 
1125 
1126 bool AccessGroup::find_merge_run(size_t *indexp, size_t *lenp) {
1127  size_t index = 0;
1128  size_t i = 0;
1129  size_t count;
1130  int64_t running_total = 0;
1131 
1132  if (m_in_memory || m_stores.size() <= 1)
1133  return false;
1134 
1135  vector<int64_t> disk_usage(m_stores.size());
1136 
1137  // If in "low activity" window, first try to be more aggresive
1138  if (Global::low_activity_time.within_window()) {
1139  bool run_found = false;
1140  for (int64_t target = Global::cellstore_target_size_min*2;
1143  index = 0;
1144  i = 0;
1145  running_total = 0;
1146 
1147  do {
1148  disk_usage[i] = m_stores[i].cs->disk_usage();
1149  running_total += disk_usage[i];
1150 
1151  if (running_total >= target) {
1152  count = (i - index) + 1;
1153  if (count >= (size_t)2) {
1154  if (indexp)
1155  *indexp = index;
1156  if (lenp)
1157  *lenp = count;
1158  run_found = true;
1159  break;
1160  }
1161  // Otherwise, move the index forward by one and try again
1162  running_total -= disk_usage[index];
1163  index++;
1164  }
1165  i++;
1166  } while (i < m_stores.size());
1167  if (i == m_stores.size())
1168  break;
1169  }
1170  if (run_found)
1171  return true;
1172  }
1173 
1174  index = 0;
1175  i = 0;
1176  running_total = 0;
1177  do {
1178  disk_usage[i] = m_stores[i].cs->disk_usage();
1179  running_total += disk_usage[i];
1180 
1181  if (running_total >= Global::cellstore_target_size_min) {
1182  count = (i - index) + 1;
1183  if (count >= (size_t)Global::merge_cellstore_run_length_threshold) {
1184  if (indexp)
1185  *indexp = index;
1186  if (lenp)
1187  *lenp = count;
1188  return true;
1189  }
1190  // Otherwise, move the index forward by one and try again
1191  running_total -= disk_usage[index];
1192  index++;
1193  }
1194  i++;
1195  } while (i < m_stores.size());
1196 
1197  if ((i-index) >= (size_t)Global::merge_cellstore_run_length_threshold) {
1198  if (indexp)
1199  *indexp = index;
1200  if (lenp)
1201  *lenp = i-index;
1202  return true;
1203  }
1204 
1205  return false;
1206 }
1207 
1208 namespace {
1209  struct LtCellStoreInfoTimestamp {
1210  bool operator()(const CellStoreInfo &x, const CellStoreInfo &y) const {
1211  return x.timestamp_min < y.timestamp_min;
1212  }
1213  };
1214 }
1215 
1216 
1218  LtCellStoreInfoTimestamp order;
1219  sort(m_stores.begin(), m_stores.end(), order);
1220 }
1221 
1222 void AccessGroup::dump_keys(ofstream &out) {
1223  lock_guard<mutex> lock(m_mutex);
1224  ColumnFamilySpec *cf_spec;
1225  const char *family;
1226  KeySet keys;
1227 
1228  // write header line
1229  out << "\n" << m_full_name << " Keys:\n";
1230 
1231  m_cell_cache_manager->populate_key_set(keys);
1232 
1233  for (KeySet::iterator iter = keys.begin();
1234  iter != keys.end(); ++iter) {
1235  if ((cf_spec = m_schema->get_column_family((*iter).column_family_code, true)))
1236  family = cf_spec->get_name().c_str();
1237  else
1238  family = "UNKNOWN";
1239  out << (*iter).row << " " << family;
1240  if (*(*iter).column_qualifier)
1241  out << ":" << (*iter).column_qualifier;
1242  out << " 0x" << hex << (int)(*iter).flag << dec
1243  << " ts=" << (*iter).timestamp
1244  << " rev=" << (*iter).revision << "\n";
1245  }
1246 }
1247 
1250 }
1251 
1252 
1253 ostream &Hypertable::operator<<(ostream &os, const AccessGroup::MaintenanceData &mdata) {
1254  os << "ACCESS GROUP " << mdata.ag->get_full_name() << "\n";
1255  os << "earliest_cached_revision=" << mdata.earliest_cached_revision << "\n";
1256  os << "latest_stored_revision=" << mdata.latest_stored_revision << "\n";
1257  os << "mem_used=" << mdata.mem_used << "\n";
1258  os << "mem_allocated=" << mdata.mem_allocated << "\n";
1259  os << "cell_count=" << mdata.cell_count << "\n";
1260  os << "disk_used=" << mdata.disk_used << "\n";
1261  os << "disk_estimate=" << mdata.disk_estimate << "\n";
1262  os << "log_space_pinned=" << mdata.log_space_pinned << "\n";
1263  os << "key_bytes=" << mdata.key_bytes << "\n";
1264  os << "value_bytes=" << mdata.value_bytes << "\n";
1265  os << "file_count=" << mdata.file_count << "\n";
1266  os << "deletes=" << mdata.deletes << "\n";
1267  os << "outstanding_scanners=" << mdata.outstanding_scanners << "\n";
1268  os << "compression_ratio=" << mdata.compression_ratio << "\n";
1269  os << "maintenance_flags=" << mdata.maintenance_flags << "\n";
1270  os << "block_index_memory=" << mdata.block_index_memory << "\n";
1271  os << "bloom_filter_memory=" << mdata.bloom_filter_memory << "\n";
1272  os << "bloom_filter_accesses=" << mdata.bloom_filter_accesses << "\n";
1273  os << "bloom_filter_maybes=" << mdata.bloom_filter_maybes << "\n";
1274  os << "bloom_filter_fps=" << mdata.bloom_filter_fps << "\n";
1275  os << "shadow_cache_memory=" << mdata.shadow_cache_memory << "\n";
1276  os << "in_memory=" << (mdata.in_memory ? "true" : "false") << "\n";
1277  os << "gc_needed=" << (mdata.gc_needed ? "true" : "false") << "\n";
1278  os << "needs_merging=" << (mdata.needs_merging ? "true" : "false") << "\n";
1279  return os;
1280 }
static bool enable_shadow_cache
Definition: Global.h:107
MaintenanceData * get_maintenance_data(ByteArena &arena, time_t now, int flags)
Definition: AccessGroup.cc:344
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
void update_cellstore_info(std::vector< CellStoreInfo > &stores, time_t t=0, bool collection_performed=true)
Updates stored data statistics from current set of CellStores.
std::set< uint8_t > m_column_families
Definition: AccessGroup.h:274
void add_references(const std::vector< String > &filev)
Adds a set of files to the referenced file set.
Declarations for AccessGroup.
Holds cache statistics.
Definition: CellCache.h:57
Cell list scanner over a buffer of cells.
const char * row
Definition: Key.h:129
Declarations for CellStoreFactory.
static int32_t merge_cellstore_run_length_threshold
Definition: Global.h:110
The FailureInducer simulates errors.
static String filename
Definition: Config.cc:48
Range specification.
Definition: RangeSpec.h:40
Declarations for MergeScannerAccessGroup.
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
Declarations for CellStoreV7.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
bool purge_cellstore(int flags)
Tests the PURGE_CELLSTORE bit of flags
void measure_garbage(double *total, double *garbage)
Definition: AccessGroup.cc:458
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void update_live(const String &add, std::vector< String > &deletes, uint32_t nextcsid, int64_t total_blocks)
Updates the live file set.
std::mutex m_outstanding_scanner_mutex
Definition: AccessGroup.h:269
int32_t m_outstanding_scanner_count
Definition: AccessGroup.h:271
void install_release_callback(CellStoreReleaseCallback &cb)
void update_schema(SchemaPtr &schema, AccessGroupSpec *ag_spec)
Currently supports only adding and deleting column families from AccessGroup.
Definition: AccessGroup.cc:98
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
Column family specification.
static void parse_bloom_filter(const std::string &spec, PropertiesPtr &props)
Parsers a bloom filter specification and sets properties.
CellStoreMaintenanceData * csdata
Definition: AccessGroup.h:86
bool purge_shadow_cache(int flags)
Tests the PURGE_SHADOW_CACHE bit of flags
Maps object pointers to bit fields.
STL namespace.
bool recompute_merge_run(int flags)
Tests the RECOMPUTE_MERGE_RUN bit of flags
void load_hints(Hints *hints)
Definition: AccessGroup.cc:846
static FailureInducer * instance
This is a singleton class.
Scan context information.
Definition: ScanContext.h:52
bool include_in_scan(ScanContext *scan_ctx)
Definition: AccessGroup.cc:268
int16_t get_option_replication() const
Gets replication option.
int64_t m_latest_stored_revision_hint
Definition: AccessGroup.h:291
static bool enabled()
Returns true if the FailureInducer is enabled (= if an instance was allocated)
void split_row_estimate_data_stored(CellList::SplitRowDataMapT &split_row_data)
Definition: AccessGroup.cc:285
CharT * alloc(size_t sz)
Allocate sz bytes.
Definition: PageArena.h:216
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
Definition: md5.cc:425
void run_compaction(int maintenance_flags, Hints *hints)
Definition: AccessGroup.cc:487
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
int flags(const void *key)
Returns bit field for a give pointer.
void get_merge_info(bool &needs_merging, bool &end_merge)
Gets merging compaction information.
Definition: AccessGroup.h:258
std::set< Key, key_revision_lt > KeySet
Definition: CellCache.h:44
int64_t m_earliest_cached_revision
Definition: AccessGroup.h:288
int64_t m_latest_stored_revision
Definition: AccessGroup.h:290
void merge_caches()
Assumes mutex is locked.
const char * end_row
Definition: RangeSpec.h:60
std::map< const char *, int64_t, LtCstr, SplitRowDataAlloc > SplitRowDataMapT
Definition: CellList.h:66
A dynamic, resizable memory buffer.
std::condition_variable m_outstanding_scanner_cond
Definition: AccessGroup.h:270
void maybe_fail(const String &label)
Tests and executes the induced failures.
static std::string toplevel_dir
Definition: Global.h:108
bool get_option_in_memory() const
Gets in memory option.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void split_row_estimate_data_cached(CellList::SplitRowDataMapT &split_row_data)
Definition: AccessGroup.cc:279
const ScanSpec * spec
Definition: ScanContext.h:55
const std::string & get_name() const
Gets access group name.
std::shared_ptr< MergeScannerAccessGroup > MergeScannerAccessGroupPtr
Shared pointer to MergeScannerAccessGroup.
void remove_references(const std::vector< String > &filev)
Decrements the reference count of each file in the given vector.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
bool check_needed(time_t now)
Signals if garbage collection is likely needed.
void shrink(String &split_row, bool drop_high, Hints *hints)
Definition: AccessGroup.cc:899
Compatibility Macros for C/C++.
void populate_cellstore_index_pseudo_table_scanner(CellListScannerBuffer *scanner)
Populates scanner with data for .cellstore.index pseudo table.
Definition: AccessGroup.cc:293
std::ostream & operator<<(std::ostream &os, const crontab_entry &entry)
Helper function to write crontab_entry to an ostream.
Definition: Crontab.cc:301
void update_schema(AccessGroupSpec *ag_spec)
Updates control variables from access group schema definition.
#define HT_END
Definition: Logger.h:220
void dump_garbage_tracker_statistics(std::ofstream &out)
Prints human-readable representation of garbage tracker state to an output stream.
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
void space_usage(int64_t *memp, int64_t *diskp)
Definition: AccessGroup.cc:312
#define HT_ERROR_OUT
Definition: Logger.h:301
pair< int64_t, int64_t > time_interval
Definition: ScanContext.h:70
static int64_t cellstore_target_size_min
Definition: Global.h:97
void add(const Key &key, const ByteString value)
Adds a key/value pair.
Definition: AccessGroup.cc:153
void purge_stored_cells_from_cache()
Definition: AccessGroup.cc:867
Access group specification.
void add_scanner(CellListScannerPtr scanner)
#define HT_WARN_OUT
Definition: Logger.h:291
int32_t get_option_blocksize() const
Gets blocksize option.
bool move_compaction(int flags)
Tests the COMPACT_MOVE bit of flags
std::shared_ptr< CellStore > CellStorePtr
Smart pointer to CellStore.
Definition: CellStore.h:340
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
static bool ignore_clock_skew_errors
Definition: Global.h:111
CellCacheManagerPtr m_cell_cache_manager
Definition: AccessGroup.h:284
static const int64_t TIMESTAMP_MAX
Definition: KeySpec.h:35
LiveFileTracker m_file_tracker
Definition: AccessGroup.h:292
void update_files_column()
Updates the 'Files' METADATA column if it needs updating.
const char * start_row
Definition: RangeSpec.h:59
AccessGroupGarbageTracker m_garbage_tracker
Definition: AccessGroup.h:293
#define HT_ERROR(msg)
Definition: Logger.h:299
const char * get_full_name()
Definition: AccessGroup.h:207
ColumnFamilySpecs & columns()
Returns reference to column specifications.
Represents the trailer for CellStore version 7.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Provides access to internal components of opaque key.
Definition: Key.h:40
std::vector< CellStoreInfo > m_stores
Definition: AccessGroup.h:282
static int64_t cellstore_target_size_max
Definition: Global.h:98
void get_file_list(String &file_list)
Populates string with live files separated by ';'.
bool split(int flags)
Tests the SPLIT bit of flags
PropertiesPtr m_cellstore_props
Definition: AccessGroup.h:283
This is a generic exception class for Hypertable.
Definition: Error.h:314
const std::string & get_option_bloom_filter() const
Gets bloom filter option.
void recompute_compression_ratio(int64_t *total_index_entriesp=0)
static const char * END_ROOT_ROW
Definition: Key.h:50
static LoadStatisticsPtr load_statistics
Definition: Global.h:72
TableIdentifierManaged m_identifier
Definition: AccessGroup.h:272
int64_t revision
Definition: Key.h:135
void output_state(std::ofstream &out, const std::string &label)
Prints a human-readable representation of internal state to an output stream.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void change_range(const String &start_row, const String &end_row)
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
std::shared_ptr< CellListScanner > CellListScannerPtr
Definition: CellList.h:35
static bool ignore_cells_with_clock_skew
Definition: Global.h:76
AccessGroup(const TableIdentifier *identifier, SchemaPtr &schema, AccessGroupSpec *ag_spec, const RangeSpec *range, const Hints *hints=nullptr)
Definition: AccessGroup.cc:57
static TimeWindow low_activity_time
Definition: Global.h:116
bool find_merge_run(size_t *indexp=0, size_t *lenp=0)
bool relinquish(int flags)
Tests the RELINQUISH bit of flags
MergeScannerAccessGroup * create_scanner(ScanContext *scan_ctx)
Definition: AccessGroup.cc:187
Declarations for MaintenanceFlag This file contains declarations that are part of the MaintenanceFlag...
bool major_compaction(int flags)
Tests the COMPACT_MAJOR bit of flags
static CellStorePtr open(const String &name, const char *start_row, const char *end_row)
Creates a CellStore object from a given cell store file.
void dump_keys(std::ofstream &out)
void release_files(const std::vector< String > &files)
Error codes, Exception handling, error logging.
md5 digest routines.
#define HT_FATAL_OUT
Definition: Logger.h:347
const std::string get_option_compressor() const
Gets compressor option.
int64_t m_earliest_cached_revision_saved
Definition: AccessGroup.h:289
void add_live_noupdate(const String &fname, int64_t total_blocks)
Adds a file to the live file set without seting the 'need_update' bit.
uint64_t purge_memory(MaintenanceFlag::Map &subtask_map)
Definition: AccessGroup.cc:320
void load_cellstore(CellStorePtr &cellstore)
Definition: AccessGroup.cc:436
void adjust_targets(time_t now, double total, double garbage)
Adjusts targets based on measured garbage.
bool gc_compaction(int flags)
Tests the COMPACT_GC bit of flags
bool collection_needed(double total, double garbage)
Determines if garbage collection is actually needed.
bool merging_compaction(int flags)
Tests the COMPACT_MERGING bit of flags
std::shared_ptr< CellCache > CellCachePtr
Shared smart pointer to CellCache.
Definition: CellCache.h:163
int code() const
Returns the error code.
Definition: Error.h:391
std::shared_ptr< ScanContext > ScanContextPtr
Definition: ScanContext.h:169
Merge scanner for access groups.