0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
RangeServer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
25 
26 #include <Common/Compat.h>
27 
28 #include "RangeServer.h"
29 
49 
52 #include <Hypertable/Lib/Key.h>
60 
61 #include <FsBroker/Lib/Client.h>
62 
63 #include <Common/FailureInducer.h>
64 #include <Common/FileUtils.h>
65 #include <Common/Random.h>
66 #include <Common/ScopeGuard.h>
67 #include <Common/Status.h>
68 #include <Common/StatusPersister.h>
69 #include <Common/StringExt.h>
70 #include <Common/SystemInfo.h>
71 #include <Common/md5.h>
72 
73 #include <boost/algorithm/string.hpp>
74 
75 #if defined(TCMALLOC_MINIMAL)
76 #include <gperftools/tcmalloc.h>
77 #include <gperftools/malloc_extension.h>
78 #elif defined(TCMALLOC)
79 #include <gperftools/heap-checker.h>
80 #include <gperftools/heap-profiler.h>
81 #include <gperftools/malloc_extension.h>
82 #include <gperftools/tcmalloc.h>
83 #endif
84 
85 #include <algorithm>
86 #include <cassert>
87 #include <chrono>
88 #include <cstdio>
89 #include <cstring>
90 #include <fstream>
91 #include <iostream>
92 #include <sstream>
93 #include <thread>
94 #include <unordered_map>
95 
96 extern "C" {
97 #include <fcntl.h>
98 #include <math.h>
99 #include <sys/time.h>
100 #include <sys/resource.h>
101 }
102 
103 using namespace std;
104 using namespace Hypertable;
105 using namespace Hypertable::Lib;
106 using namespace Hypertable::RangeServer;
107 using namespace Serialization;
108 using namespace Hypertable::Property;
109 
110 Apps::RangeServer::RangeServer(PropertiesPtr &props, ConnectionManagerPtr &conn_mgr,
111  ApplicationQueuePtr &app_queue, Hyperspace::SessionPtr &hyperspace)
112  : m_props(props), m_conn_manager(conn_mgr),
113  m_app_queue(app_queue), m_hyperspace(hyperspace) {
114 
116  std::make_shared<MetricsCollectorGanglia>("rangeserver", props);
117 
118  m_context = std::make_shared<Context>();
119  m_context->props = props;
120  m_context->comm = conn_mgr->get_comm();
121  m_context->server_state = std::make_shared<ServerState>();
122  m_context->live_map = make_shared<TableInfoMap>();
123 
124  m_log_replay_barrier = std::make_shared<LogReplayBarrier>();
125 
126  uint16_t port;
128  HT_ASSERT(m_cores != 0);
129  SubProperties cfg(props, "Hypertable.RangeServer.");
130 
131  m_verbose = props->get_bool("verbose");
132  Global::row_size_unlimited = cfg.get_bool("Range.RowSize.Unlimited", false);
134  = cfg.get_bool("Range.IgnoreCellsWithClockSkew");
135  Global::failover_timeout = props->get_i32("Hypertable.Failover.Timeout");
136  Global::range_split_size = cfg.get_i64("Range.SplitSize");
137  Global::range_maximum_size = cfg.get_i64("Range.MaximumSize");
138  Global::range_metadata_split_size = cfg.get_i64("Range.MetadataSplitSize",
141  cfg.get_i32("AccessGroup.GarbageThreshold.Percentage");
142  Global::access_group_max_mem = cfg.get_i64("AccessGroup.MaxMemory");
143  Global::enable_shadow_cache = cfg.get_bool("AccessGroup.ShadowCache");
144  Global::cellstore_target_size_min = cfg.get_i64("CellStore.TargetSize.Minimum");
145  Global::cellstore_target_size_max = cfg.get_i64("CellStore.TargetSize.Maximum");
147  m_scanner_buffer_size = cfg.get_i64("Scanner.BufferSize");
148  port = cfg.get_i16("Port");
149 
150  m_control_file_check_interval = cfg.get_i32("ControlFile.CheckInterval");
151  m_last_control_file_check = chrono::steady_clock::now();
152 
153  // Initialize "low activity" window
154  {
155  vector<String> specs = cfg.get_strs("LowActivityPeriod");
156  if (specs.empty())
157  specs.push_back("* 2-4 * * *");
158  else if (find(specs.begin(), specs.end(), "none") != specs.end())
159  specs.clear();
161  }
162 
164  uint32_t maintenance_threads;
165  {
166  int32_t disk_count = System::get_drive_count();
167  maintenance_threads = std::max(((disk_count*3)/2), (int32_t)m_cores);
168  if (maintenance_threads < 2)
169  maintenance_threads = 2;
170  maintenance_threads = cfg.get_i32("MaintenanceThreads", maintenance_threads);
171  HT_INFOF("drive count = %d, maintenance threads = %d", disk_count, maintenance_threads);
172  }
173 
174  Global::toplevel_dir = props->get_str("Hypertable.Directory");
175  boost::trim_if(Global::toplevel_dir, boost::is_any_of("/"));
177 
178  Global::merge_cellstore_run_length_threshold = cfg.get_i32("CellStore.Merge.RunLengthThreshold");
179  Global::ignore_clock_skew_errors = cfg.get_bool("IgnoreClockSkewErrors");
180 
181  int64_t interval = (int64_t)cfg.get_i32("Maintenance.Interval");
182 
183  Global::load_statistics = make_shared<LoadStatistics>(interval);
184 
185  m_stats = make_shared<StatsRangeServer>(m_props);
186 
187  m_namemap = make_shared<NameIdMapper>(m_hyperspace, Global::toplevel_dir);
188 
189  m_scanner_ttl = (time_t)cfg.get_i32("Scanner.Ttl");
190 
191  Global::metrics_interval = props->get_i32("Hypertable.LoadMetrics.Interval");
192  if (HT_FAILURE_SIGNALLED("report-metrics-immediately")) {
193  m_next_metrics_update = time(0);
194  }
195  else {
198  // randomly pick a time within 5 minutes of the next update
200  + (Random::number32() % 300);
201  }
202 
204  cfg.get_i32("AccessGroup.CellCache.ScannerCacheSize");
205 
206  if (m_scanner_ttl < (time_t)10000) {
207  HT_WARNF("Value %u for Hypertable.RangeServer.Scanner.ttl is too small, "
208  "setting to 10000", (unsigned int)m_scanner_ttl);
209  m_scanner_ttl = (time_t)10000;
210  }
211 
212  const MemStat &mem_stat = System::mem_stat();
213  if (cfg.has("MemoryLimit"))
214  Global::memory_limit = cfg.get_i64("MemoryLimit");
215  else {
216  double pct = std::max(1.0, std::min((double)cfg.get_i32("MemoryLimit.Percentage"), 99.0)) / 100.0;
217  Global::memory_limit = (int64_t)(mem_stat.ram * Property::MiB * pct);
218  }
219 
220  if (cfg.has("MemoryLimit.EnsureUnused"))
221  Global::memory_limit_ensure_unused = cfg.get_i64("MemoryLimit.EnsureUnused");
222  else if (cfg.has("MemoryLimit.EnsureUnused.Percentage")) {
223  double pct = std::max(1.0, std::min((double)cfg.get_i32("MemoryLimit.EnsureUnused.Percentage"), 99.0)) / 100.0;
224  Global::memory_limit_ensure_unused = (int64_t)(mem_stat.ram * Property::MiB * pct);
225  }
226 
228  // adjust current limit according to the actual memory situation
229  int64_t free_memory_50pct = (int64_t)(0.5 * mem_stat.free * Property::MiB);
232  HT_NOTICEF("Start up in low memory condition (free memory %.2fMB)", mem_stat.free);
233  }
234 
235  int64_t block_cache_min = cfg.get_i64("BlockCache.MinMemory");
236  int64_t block_cache_max = cfg.get_i64("BlockCache.MaxMemory");
237  if (block_cache_max == -1) {
238  double physical_ram = mem_stat.ram * Property::MiB;
239  block_cache_max = (int64_t)physical_ram;
240  }
241  if (block_cache_min > block_cache_max)
242  block_cache_min = block_cache_max;
243 
244  if (block_cache_max > 0)
245  Global::block_cache = new FileBlockCache(block_cache_min, block_cache_max,
246  cfg.get_bool("BlockCache.Compressed"));
247 
248  int64_t query_cache_memory = cfg.get_i64("QueryCache.MaxMemory");
249  if (query_cache_memory > 0) {
250  // reduce query cache if required
251  if ((double)query_cache_memory > (double)Global::memory_limit * 0.2) {
252  query_cache_memory = (int64_t)((double)Global::memory_limit * 0.2);
253  props->set("Hypertable.RangeServer.QueryCache.MaxMemory", query_cache_memory);
254  HT_INFOF("Maximum size of query cache has been reduced to %.2fMB", (double)query_cache_memory / Property::MiB);
255  }
256  m_query_cache = std::make_shared<QueryCache>(query_cache_memory);
257  }
258 
260 
261  FsBroker::Lib::ClientPtr dfsclient = std::make_shared<FsBroker::Lib::Client>(conn_mgr, props);
262 
263  int dfs_timeout;
264  if (props->has("FsBroker.Timeout"))
265  dfs_timeout = props->get_i32("FsBroker.Timeout");
266  else
267  dfs_timeout = props->get_i32("Hypertable.Request.Timeout");
268 
269  if (!dfsclient->wait_for_connection(dfs_timeout))
270  HT_THROW(Error::REQUEST_TIMEOUT, "connecting to FS Broker");
271 
272  Global::dfs = dfsclient;
273 
274  m_log_roll_limit = cfg.get_i64("CommitLog.RollLimit");
275 
279  if (cfg.has("CommitLog.DfsBroker.Host")) {
280  String loghost = cfg.get_str("CommitLog.DfsBroker.Host");
281  uint16_t logport = cfg.get_i16("CommitLog.DfsBroker.Port");
282  InetAddr addr(loghost, logport);
283 
284  dfsclient = std::make_shared<FsBroker::Lib::Client>(conn_mgr, addr, dfs_timeout);
285 
286  if (!dfsclient->wait_for_connection(30000))
287  HT_THROW(Error::REQUEST_TIMEOUT, "connecting to commit log FS broker");
288 
289  Global::log_dfs = dfsclient;
290  }
291  else
293 
294  // Create the maintenance queue
295  Global::maintenance_queue = make_shared<MaintenanceQueue>(maintenance_threads);
296 
301  make_shared<HandlerFactory>(m_context->comm, m_app_queue, RangeServerPtr(this));
302 
303  InetAddr listen_addr(INADDR_ANY, port);
304  try {
305  m_context->comm->listen(listen_addr, chfp);
306  }
307  catch (Exception &e) {
308  HT_ERRORF("Unable to listen on port %u - %s - %s",
309  port, Error::get_text(e.code()), e.what());
310  quick_exit(EXIT_SUCCESS);
311  }
312 
313  Global::location_initializer = make_shared<LocationInitializer>(m_context);
314 
315  if(Global::location_initializer->is_removed(Global::toplevel_dir+"/servers", m_hyperspace)) {
316  HT_ERROR_OUT << "location " << Global::location_initializer->get()
317  << " has been marked removed in hyperspace" << HT_END;
318  quick_exit(EXIT_FAILURE);
319  }
320 
321  // Create Master client
322  int timeout = props->get_i32("Hypertable.Request.Timeout");
324  = make_shared<ConnectionHandler>(m_context->comm, m_app_queue, this);
327  make_shared<Lib::Master::Client>(m_conn_manager, m_hyperspace,
328  Global::toplevel_dir, timeout, aq,
332 
333  Global::location_initializer->wait_for_handshake();
334 
335  initialize(props);
336 
337  Global::log_prune_threshold_min = cfg.get_i64("CommitLog.PruneThreshold.Min");
338 
339  uint32_t max_memory_percentage =
340  cfg.get_i32("CommitLog.PruneThreshold.Max.MemoryPercentage");
341 
342  HT_ASSERT(max_memory_percentage >= 0 && max_memory_percentage <= 100);
343 
344  double max_memory_ratio = (double)max_memory_percentage / 100.0;
345 
346  int64_t threshold_max = (int64_t)(mem_stat.ram *
347  max_memory_ratio * (double)MiB);
348 
349  Global::log_prune_threshold_max = cfg.get_i64("CommitLog.PruneThreshold.Max", threshold_max);
350 
352  convert(props->get_str("Hypertable.LogFlushMethod.Meta"));
353 
355  convert(props->get_str("Hypertable.LogFlushMethod.User"));
356 
358  std::make_shared<MaintenanceScheduler>(Global::maintenance_queue,
359  m_context->live_map);
360 
361  // Install maintenance timer
362  m_timer_handler = make_shared<TimerHandler>(m_context->comm, this);
363 
364  local_recover();
365 
366  m_timer_handler->start();
367 
368  HT_INFOF("Prune thresholds - min=%lld, max=%lld", (Lld)Global::log_prune_threshold_min,
370 
371  m_startup = false;
372 
373 }
374 
377  if (m_startup)
379  else if (m_shutdown)
382  status.set(Status::Code::WARNING, "Range initialization not yet complete");
383  else {
384  Timer timer(cb->event()->header.timeout_ms, true);
385  Global::dfs->status(status, &timer);
386  Status::Code code;
387  string text;
388  status.get(&code, text);
389  if (code != Status::Code::OK)
390  status.set(code, format("[fsbroker] %s", text.c_str()));
391  else
392  StatusPersister::get(status);
393  }
394  cb->response(status);
395 }
396 
398 
399  try {
400 
401  m_shutdown = true;
402 
403  // stop maintenance timer
404  if (m_timer_handler)
405  m_timer_handler->shutdown();
406 
407  // stop maintenance queue
408  Global::maintenance_queue->shutdown();
409  //Global::maintenance_queue->join();
410 
411  // stop application queue
412  m_app_queue->stop();
413 
414  // wait no more than 30 seconds
415  auto deadline = chrono::steady_clock::now() + chrono::seconds(30);
416  m_app_queue->wait_for_idle(deadline, 1);
417 
418  lock_guard<mutex> lock(m_stats_mutex);
419 
420  if (m_group_commit_timer_handler)
421  m_group_commit_timer_handler->shutdown();
422 
423  // Kill update pipelines
424  m_update_pipeline_user->shutdown();
425  if (m_update_pipeline_system)
426  m_update_pipeline_system->shutdown();
427  if (m_update_pipeline_metadata)
428  m_update_pipeline_metadata->shutdown();
429 
430  Global::range_locator.reset();
431 
432  if (Global::rsml_writer) {
433  Global::rsml_writer->close();
434  //Global::rsml_writer.reset();
435  }
436  if (Global::root_log) {
437  Global::root_log->close();
438  //Global::root_log.reset();
439  }
440  if (Global::metadata_log) {
441  Global::metadata_log->close();
442  //Global::metadata_log.reset();
443  }
444  if (Global::system_log) {
445  Global::system_log->close();
446  //Global::system_log.reset();
447  }
448  if (Global::user_log) {
449  Global::user_log->close();
450  //Global::user_log.reset();
451  }
452 
453  /*
454  if (Global::block_cache) {
455  delete Global::block_cache;
456  Global::block_cache = 0;
457  }
458  Global::maintenance_queue = 0;
459  Global::metadata_table = 0;
460  Global::rs_metrics_table = 0;
461  Global::hyperspace = 0;
462  Global::log_dfs = 0;
463  Global::dfs = 0;
464  delete Global::memory_tracker;
465  Global::memory_tracker = 0;
466  */
467 
468  m_app_queue->shutdown();
469  }
470  catch (Exception &e) {
471  HT_ERROR_OUT << e << HT_END;
472  quick_exit(EXIT_FAILURE);
473  }
474 
475 }
476 
478 }
479 
480 
487  String top_dir = Global::toplevel_dir + "/servers/" + Global::location_initializer->get();
488  LockStatus lock_status;
489  uint32_t oflags = OPEN_FLAG_READ | OPEN_FLAG_WRITE | OPEN_FLAG_LOCK;
490 
491  m_existence_file_handle = m_hyperspace->open(top_dir, oflags);
492 
493  while (true) {
494  lock_status = (LockStatus)0;
495 
496  m_hyperspace->try_lock(m_existence_file_handle, LOCK_MODE_EXCLUSIVE,
497  &lock_status, &m_existence_file_sequencer);
498 
499  if (lock_status == LOCK_STATUS_GRANTED) {
500  Global::location_initializer->set_lock_held();
501  break;
502  }
503 
504  HT_INFOF("Waiting for exclusive lock on hyperspace:/%s ...",
505  top_dir.c_str());
506  this_thread::sleep_for(chrono::milliseconds(5000));
507  }
508 
509  Global::log_dir = top_dir + "/log";
510 
514  String path;
515  try {
516  path = Global::log_dir + "/user";
517  Global::log_dfs->mkdirs(path);
518  }
519  catch (Exception &e) {
520  HT_THROW2F(e.code(), e, "Problem creating commit log directory '%s': %s",
521  path.c_str(), e.what());
522  }
523 
524  HT_INFOF("log_dir=%s", Global::log_dir.c_str());
525 }
526 
527 
528 namespace {
529 
530  struct ByFragmentNumber {
531  bool operator()(const Filesystem::Dirent &x, const Filesystem::Dirent &y) const {
532  int num_x = atoi(x.name.c_str());
533  int num_y = atoi(y.name.c_str());
534  return num_x < num_y;
535  }
536  };
537 
538  void add_mark_file_to_commit_logs(const String &logname) {
539  vector<Filesystem::Dirent> listing;
540  vector<Filesystem::Dirent> listing2;
541  String logdir = Global::log_dir + "/" + logname;
542 
543  try {
544  if (!Global::log_dfs->exists(logdir))
545  return;
546  Global::log_dfs->readdir(logdir, listing);
547  }
548  catch (Hypertable::Exception &) {
549  HT_FATALF("Unable to read log directory '%s'", logdir.c_str());
550  }
551 
552  if (listing.size() == 0)
553  return;
554 
555  sort(listing.begin(), listing.end(), ByFragmentNumber());
556 
557  // Remove zero-length files
558  for (auto &entry : listing) {
559  String fragment_file = logdir + "/" + entry.name;
560  try {
561  if (Global::log_dfs->length(fragment_file) == 0) {
562  HT_INFOF("Removing log fragment '%s' because it has zero length",
563  fragment_file.c_str());
564  Global::log_dfs->remove(fragment_file);
565  }
566  else
567  listing2.push_back(entry);
568  }
569  catch (Hypertable::Exception &) {
570  HT_FATALF("Unable to check fragment file '%s'", fragment_file.c_str());
571  }
572  }
573 
574  if (listing2.size() == 0)
575  return;
576 
577  char *endptr;
578  long num = strtol(listing2.back().name.c_str(), &endptr, 10);
579  String mark_filename = logdir + "/" + (int64_t)num + ".mark";
580 
581  try {
582  int fd = Global::log_dfs->create(mark_filename, 0, -1, -1, -1);
583  StaticBuffer buf(1);
584  *buf.base = '0';
585  Global::log_dfs->append(fd, buf);
586  Global::log_dfs->close(fd);
587  }
588  catch (Hypertable::Exception &) {
589  HT_FATALF("Unable to create file '%s'", mark_filename.c_str());
590  }
591  }
592 
593 }
594 
595 
597  MetaLog::DefinitionPtr rsml_definition =
598  make_shared<MetaLog::DefinitionRangeServer>(Global::location_initializer->get().c_str());
599  MetaLog::ReaderPtr rsml_reader;
600  CommitLogReaderPtr root_log_reader;
601  CommitLogReaderPtr system_log_reader;
602  CommitLogReaderPtr metadata_log_reader;
603  CommitLogReaderPtr user_log_reader;
604  Ranges ranges;
605  std::vector<MetaLog::EntityPtr> entities, stripped_entities;
606  StringSet transfer_logs;
607  TableInfoMap replay_map(make_shared<HyperspaceTableCache>(m_hyperspace, Global::toplevel_dir));
608  int priority = 0;
609  String rsml_dir = Global::log_dir + "/" + rsml_definition->name();
610 
611  try {
612  rsml_reader =
613  make_shared<MetaLog::Reader>(Global::log_dfs, rsml_definition, rsml_dir);
614  }
615  catch (Exception &e) {
616  HT_FATALF("Problem reading RSML %s: %s - %s", rsml_dir.c_str(),
617  Error::get_text(e.code()), e.what());
618  }
619 
620  try {
621  std::vector<MaintenanceTask*> maintenance_tasks;
622  auto now = chrono::steady_clock::now();
623 
624  rsml_reader->get_entities(entities);
625 
626  if (!entities.empty()) {
627  HT_DEBUG_OUT << "Found "<< Global::log_dir << "/"
628  << rsml_definition->name() <<", start recovering"<< HT_END;
629 
630  // Temporary code to handle upgrade from RANGE to RANGE2
631  // Metalog entries. Should be removed around 12/2013
633  add_mark_file_to_commit_logs("root");
634  add_mark_file_to_commit_logs("metadata");
635  add_mark_file_to_commit_logs("system");
636  add_mark_file_to_commit_logs("user");
637  }
638 
639  // Populated Global::work_queue and strip out PHANTOM ranges
640  {
641  for (auto & entity : entities) {
642  if (dynamic_pointer_cast<MetaLog::EntityTask>(entity))
643  Global::add_to_work_queue(static_pointer_cast<MetaLog::EntityTask>(entity));
644  else if (dynamic_cast<MetaLogEntityRange *>(entity.get())) {
645  MetaLogEntityRangePtr range_entity = static_pointer_cast<MetaLogEntityRange>(entity);
646  if ((range_entity->get_state() & RangeState::PHANTOM) != 0) {
647  // If log was created originally for the phantom range, remove it
648  String transfer_log = range_entity->get_transfer_log();
649  if (strstr(transfer_log.c_str(), "phantom") != 0) {
650  try {
651  Global::log_dfs->rmdir(transfer_log);
652  }
653  catch (Exception &e) {
654  HT_WARNF("Problem removing phantom log %s - %s", transfer_log.c_str(),
655  Error::get_text(e.code()));
656  }
657  }
658  continue;
659  }
660  }
661  else if (dynamic_cast<MetaLogEntityRemoveOkLogs *>(entity.get())) {
662  MetaLogEntityRemoveOkLogsPtr remove_ok_logs =
663  dynamic_pointer_cast<MetaLogEntityRemoveOkLogs>(entity);
664  if (remove_ok_logs->decode_version() > 1)
665  Global::remove_ok_logs = remove_ok_logs;
666  else
667  continue;
668  }
669  stripped_entities.push_back(entity);
670  }
671  }
672 
673  entities.swap(stripped_entities);
674 
676  make_shared<MetaLog::Writer>(Global::log_dfs, rsml_definition,
677  Global::log_dir + "/" + rsml_definition->name(),
678  entities);
679 
680  replay_map.clear();
681  for (auto & entity : entities) {
682  MetaLogEntityRangePtr range_entity = dynamic_pointer_cast<MetaLogEntityRange>(entity);
683  if (range_entity) {
684  TableIdentifier table;
685  String end_row = range_entity->get_end_row();
686  range_entity->get_table_identifier(table);
687  if (table.is_metadata() && !end_row.compare(Key::END_ROOT_ROW))
688  replay_load_range(replay_map, range_entity);
689  }
690  }
691 
692  if (!replay_map.empty()) {
693  root_log_reader = make_shared<CommitLogReader>(Global::log_dfs,
694  Global::log_dir + "/root");
695  replay_log(replay_map, root_log_reader);
696 
697  root_log_reader->get_linked_logs(transfer_logs);
698 
699  // Perform any range specific post-replay tasks
700  ranges.array.clear();
701  replay_map.get_ranges(ranges);
702  for (auto &rd : ranges.array) {
703  rd.range->recovery_finalize();
704  if (rd.range->get_state() == RangeState::SPLIT_LOG_INSTALLED ||
705  rd.range->get_state() == RangeState::SPLIT_SHRUNK)
706  maintenance_tasks.push_back(new MaintenanceTaskSplit(0, priority++, now, rd.range));
707  else if (rd.range->get_state() == RangeState::RELINQUISH_LOG_INSTALLED)
708  maintenance_tasks.push_back(new MaintenanceTaskRelinquish(0, priority++, now, rd.range));
709  else
710  HT_ASSERT(rd.range->get_state() == RangeState::STEADY);
711  }
712  }
713 
714  m_context->live_map->merge(&replay_map);
715 
716  if (root_log_reader)
717  Global::root_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
718  + "/root", m_props, root_log_reader.get());
719 
720  m_log_replay_barrier->set_root_complete();
721 
722  // Finish mid-maintenance
723  if (!maintenance_tasks.empty()) {
724  for (size_t i=0; i<maintenance_tasks.size(); i++)
725  Global::maintenance_queue->add(maintenance_tasks[i]);
726  maintenance_tasks.clear();
727  }
728 
729  replay_map.clear();
730  for (auto & entity : entities) {
731  MetaLogEntityRangePtr range_entity = dynamic_pointer_cast<MetaLogEntityRange>(entity);
732  if (range_entity) {
733  TableIdentifier table;
734  String end_row = range_entity->get_end_row();
735  range_entity->get_table_identifier(table);
736  if (table.is_metadata() && end_row.compare(Key::END_ROOT_ROW))
737  replay_load_range(replay_map, range_entity);
738  }
739  }
740 
741  if (!replay_map.empty()) {
742  metadata_log_reader =
743  make_shared<CommitLogReader>(Global::log_dfs, Global::log_dir + "/metadata");
744 
745  replay_log(replay_map, metadata_log_reader);
746 
747  metadata_log_reader->get_linked_logs(transfer_logs);
748 
749  // Perform any range specific post-replay tasks
750  ranges.array.clear();
751  replay_map.get_ranges(ranges);
752  for (auto &rd : ranges.array) {
753  rd.range->recovery_finalize();
754  if (rd.range->get_state() == RangeState::SPLIT_LOG_INSTALLED ||
755  rd.range->get_state() == RangeState::SPLIT_SHRUNK)
756  maintenance_tasks.push_back(new MaintenanceTaskSplit(1, priority++, now, rd.range));
757  else if (rd.range->get_state() == RangeState::RELINQUISH_LOG_INSTALLED)
758  maintenance_tasks.push_back(new MaintenanceTaskRelinquish(1, priority++, now, rd.range));
759  else
760  HT_ASSERT(rd.range->get_state() == RangeState::STEADY);
761  }
762  }
763 
764  m_context->live_map->merge(&replay_map);
765 
766  if (root_log_reader || metadata_log_reader) {
767  Global::metadata_log = make_shared<CommitLog>(Global::log_dfs,
768  Global::log_dir + "/metadata",
769  m_props, metadata_log_reader.get());
770  m_update_pipeline_metadata =
771  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
772  Global::metadata_log, m_log_flush_method_meta);
773  }
774 
775  m_log_replay_barrier->set_metadata_complete();
776 
777  // Finish mid-maintenance
778  if (!maintenance_tasks.empty()) {
779  for (size_t i=0; i<maintenance_tasks.size(); i++)
780  Global::maintenance_queue->add(maintenance_tasks[i]);
781  maintenance_tasks.clear();
782  }
783 
784  replay_map.clear();
785  for (auto & entity : entities) {
786  MetaLogEntityRangePtr range_entity = dynamic_pointer_cast<MetaLogEntityRange>(entity);
787  if (range_entity) {
788  TableIdentifier table;
789  range_entity->get_table_identifier(table);
790  if (table.is_system() && !table.is_metadata())
791  replay_load_range(replay_map, range_entity);
792  }
793  }
794 
795  if (!replay_map.empty()) {
796  system_log_reader =
797  make_shared<CommitLogReader>(Global::log_dfs, Global::log_dir + "/system");
798 
799  replay_log(replay_map, system_log_reader);
800 
801  system_log_reader->get_linked_logs(transfer_logs);
802 
803  // Perform any range specific post-replay tasks
804  ranges.array.clear();
805  replay_map.get_ranges(ranges);
806  for (auto &rd : ranges.array) {
807  rd.range->recovery_finalize();
808  if (rd.range->get_state() == RangeState::SPLIT_LOG_INSTALLED ||
809  rd.range->get_state() == RangeState::SPLIT_SHRUNK)
810  maintenance_tasks.push_back(new MaintenanceTaskSplit(2, priority++, now, rd.range));
811  else if (rd.range->get_state() == RangeState::RELINQUISH_LOG_INSTALLED)
812  maintenance_tasks.push_back(new MaintenanceTaskRelinquish(2, priority++, now, rd.range));
813  else
814  HT_ASSERT(rd.range->get_state() == RangeState::STEADY);
815  }
816  }
817 
818  m_context->live_map->merge(&replay_map);
819 
820  // Create system log and wake up anybody waiting for system replay to
821  // complete
822  if (system_log_reader) {
823  Global::system_log = make_shared<CommitLog>(Global::log_dfs,
824  Global::log_dir + "/system", m_props,
825  system_log_reader.get());
826  m_update_pipeline_system =
827  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
828  Global::system_log, m_log_flush_method_user);
829  }
830 
831  m_log_replay_barrier->set_system_complete();
832 
833  // Finish mid-maintenance
834  if (!maintenance_tasks.empty()) {
835  for (size_t i=0; i<maintenance_tasks.size(); i++)
836  Global::maintenance_queue->add(maintenance_tasks[i]);
837  maintenance_tasks.clear();
838  }
839 
840  if (m_props->get_bool("Hypertable.RangeServer.LoadSystemTablesOnly"))
841  return;
842 
843  replay_map.clear();
844  for (auto & entity : entities) {
845  MetaLogEntityRangePtr range_entity = dynamic_pointer_cast<MetaLogEntityRange>(entity);
846  if (range_entity) {
847  TableIdentifier table;
848  range_entity->get_table_identifier(table);
849  if (!table.is_system())
850  replay_load_range(replay_map, range_entity);
851  }
852  }
853 
854  if (!replay_map.empty()) {
855  user_log_reader = make_shared<CommitLogReader>(Global::log_dfs,
856  Global::log_dir + "/user");
857 
858  replay_log(replay_map, user_log_reader);
859 
860  user_log_reader->get_linked_logs(transfer_logs);
861 
862  // Perform any range specific post-replay tasks
863  ranges.array.clear();
864  replay_map.get_ranges(ranges);
865  for (auto &rd : ranges.array) {
866  rd.range->recovery_finalize();
867  if (rd.range->get_state() == RangeState::SPLIT_LOG_INSTALLED ||
868  rd.range->get_state() == RangeState::SPLIT_SHRUNK)
869  maintenance_tasks.push_back(new MaintenanceTaskSplit(3, priority++, now, rd.range));
870  else if (rd.range->get_state() == RangeState::RELINQUISH_LOG_INSTALLED)
871  maintenance_tasks.push_back(new MaintenanceTaskRelinquish(3, priority++, now, rd.range));
872  else
873  HT_ASSERT((rd.range->get_state() & ~RangeState::PHANTOM)
874  == RangeState::STEADY);
875  }
876  }
877 
878  m_context->live_map->merge(&replay_map);
879 
880  Global::user_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
881  + "/user", m_props, user_log_reader.get(), false);
882 
883  m_update_pipeline_user =
884  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
885  Global::user_log, m_log_flush_method_user);
886 
887  m_log_replay_barrier->set_user_complete();
888 
889  // Finish mid-maintenance
890  if (!maintenance_tasks.empty()) {
891  for (size_t i=0; i<maintenance_tasks.size(); i++)
892  Global::maintenance_queue->add(maintenance_tasks[i]);
893  maintenance_tasks.clear();
894  }
895 
896  HT_NOTICE("Replay finished");
897 
898  }
899  else {
900  lock_guard<mutex> lock(m_mutex);
901 
906  if (root_log_reader)
907  Global::root_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
908  + "/root", m_props, root_log_reader.get());
909 
910  if (root_log_reader || metadata_log_reader) {
911  Global::metadata_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
912  + "/metadata", m_props, metadata_log_reader.get());
913  m_update_pipeline_metadata =
914  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
915  Global::metadata_log, m_log_flush_method_meta);
916  }
917 
918  if (system_log_reader) {
919  Global::system_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
920  + "/system", m_props, system_log_reader.get());
921  m_update_pipeline_system =
922  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
923  Global::system_log, m_log_flush_method_user);
924  }
925 
926  Global::user_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
927  + "/user", m_props, user_log_reader.get(), false);
928 
929  m_update_pipeline_user =
930  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
931  Global::user_log, m_log_flush_method_user);
932 
934  make_shared<MetaLog::Writer>(Global::log_dfs, rsml_definition,
935  Global::log_dir + "/" + rsml_definition->name(),
936  entities);
937 
938  m_log_replay_barrier->set_root_complete();
939  m_log_replay_barrier->set_metadata_complete();
940  m_log_replay_barrier->set_system_complete();
941  m_log_replay_barrier->set_user_complete();
942  }
943 
944  if (!Global::remove_ok_logs) {
945  Global::remove_ok_logs = make_shared<MetaLogEntityRemoveOkLogs>();
946  Global::remove_ok_logs->insert(transfer_logs);
948  }
949 
950  }
951  catch (Exception &e) {
952  HT_ERROR_OUT << e << HT_END;
953  HT_ABORT;
954  }
955 }
956 
957 void Apps::RangeServer::decode_table_id(const uint8_t **bufp, size_t *remainp, TableIdentifier *tid) {
958  const uint8_t *buf_saved = *bufp;
959  size_t remain_saved = *remainp;
960  try {
961  tid->decode(bufp, remainp);
962  }
963  catch (Exception &e) {
964  if (e.code() == Error::PROTOCOL_ERROR) {
965  *bufp = buf_saved;
966  *remainp = remain_saved;
967  legacy_decode(bufp, remainp, tid);
968  }
969  else
970  throw;
971  }
972 }
973 
974 
975 void
977  MetaLogEntityRangePtr &range_entity) {
978  SchemaPtr schema;
979  TableInfoPtr table_info, live_table_info;
980  TableIdentifier table;
981  RangeSpecManaged range_spec;
982  RangePtr range;
983 
984  HT_DEBUG_OUT << "replay_load_range " << *range_entity << HT_END;
985 
986  range_entity->get_table_identifier(table);
987  range_entity->get_range_spec(range_spec);
988 
989  try {
990 
991  replay_map.get(table.id, table_info);
992 
993  // If maintenance has been disabled for the table, tell the maintenance
994  // scheduler to not schedule maintenance for it
995  if (table_info->maintenance_disabled())
996  m_maintenance_scheduler->exclude(table);
997 
998  m_context->live_map->get(table.id, live_table_info);
999 
1000  // Range should not already be loaded
1001  HT_ASSERT(!live_table_info->get_range(range_spec, range));
1002 
1003  // Check table generation. If table generation obtained from TableInfoMap
1004  // is greater than the table generation in the range entity, then
1005  // automatically upgrade to new generation
1006  int64_t generation = live_table_info->get_schema()->get_generation();
1007  if (generation > table.generation) {
1008  range_entity->set_table_generation(generation);
1009  table.generation = generation;
1010  }
1011  HT_ASSERT(generation == table.generation);
1012 
1016  if (!Global::metadata_table) {
1017  lock_guard<mutex> lock(Global::mutex);
1018  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
1019  if (!Global::range_locator)
1020  Global::range_locator = make_shared<Hypertable::RangeLocator>(m_props,
1021  m_conn_manager, Global::hyperspace, timeout_ms);
1023  Global::metadata_table = make_shared<Table>(m_props, Global::range_locator,
1024  m_conn_manager, Global::hyperspace, aq,
1025  m_namemap, TableIdentifier::METADATA_NAME, 0, timeout_ms);
1026  }
1027 
1028  schema = table_info->get_schema();
1029 
1030  range = make_shared<Range>(m_master_client, schema, range_entity,
1031  live_table_info.get());
1032 
1033  range->recovery_initialize();
1034 
1035  table_info->add_range(range);
1036 
1037  HT_INFOF("Successfully replay loaded range %s", range->get_name().c_str());
1038 
1039  }
1040  catch (Hypertable::Exception &e) {
1041  if (e.code() == Error::RANGESERVER_TABLE_NOT_FOUND && !table.is_system()) {
1042  HT_WARNF("Skipping recovery of %s[%s..%s] - %s",
1043  table.id, range_spec.start_row, range_spec.end_row,
1044  e.what());
1045  return;
1046  }
1047  HT_FATAL_OUT << "Problem loading range during replay - " << e << HT_END;
1048  }
1049 }
1050 
1051 
1053  CommitLogReaderPtr &log_reader) {
1054  BlockHeaderCommitLog header;
1055  TableIdentifier table_id;
1056  TableInfoPtr table_info;
1057  Key key;
1058  SerializedKey skey;
1059  ByteString value;
1060  RangePtr range;
1061  String start_row, end_row;
1062  unsigned long block_count = 0;
1063  uint8_t *base;
1064  size_t len;
1065 
1066  while (log_reader->next((const uint8_t **)&base, &len, &header)) {
1067 
1068  const uint8_t *ptr = base;
1069  const uint8_t *end = base + len;
1070 
1071  decode_table_id(&ptr, &len, &table_id);
1072 
1073  // Fetch table info
1074  if (!replay_map.lookup(table_id.id, table_info))
1075  continue;
1076 
1077  bool pair_loaded = false;
1078 
1079  while (ptr < end) {
1080 
1081  if (!pair_loaded) {
1082  // extract the key
1083  skey.ptr = ptr;
1084  key.load(skey);
1085  ptr += skey.length();
1086  if (ptr > end)
1087  HT_THROW(Error::REQUEST_TRUNCATED, "Problem decoding key");
1088  // extract the value
1089  value.ptr = ptr;
1090  ptr += value.length();
1091  if (ptr > end)
1092  HT_THROW(Error::REQUEST_TRUNCATED, "Problem decoding value");
1093  pair_loaded = true;
1094  }
1095 
1096  while (pair_loaded) {
1097  if (!table_info->find_containing_range(key.row, range, start_row, end_row)) {
1098  pair_loaded = false;
1099  continue;
1100  }
1101  lock_guard<Range> lock(*range);
1102  do {
1103  range->add(key, value);
1104  if (ptr == end) {
1105  pair_loaded = false;
1106  break;
1107  }
1108  // extract the key
1109  skey.ptr = ptr;
1110  key.load(skey);
1111  ptr += skey.length();
1112  if (ptr > end)
1113  HT_THROW(Error::REQUEST_TRUNCATED, "Problem decoding key");
1114  // extract the value
1115  value.ptr = ptr;
1116  ptr += value.length();
1117  if (ptr > end)
1118  HT_THROW(Error::REQUEST_TRUNCATED, "Problem decoding value");
1119  } while (start_row.compare(key.row) < 0 && end_row.compare(key.row) >= 0);
1120  }
1121 
1122  }
1123  block_count++;
1124  }
1125 
1126  HT_INFOF("Replayed %lu blocks of updates from '%s'", block_count,
1127  log_reader->get_log_dir().c_str());
1128 }
1129 
1130 void
1132  const char *row, int32_t flags) {
1133  Ranges ranges;
1134  TableInfoPtr table_info;
1135  String start_row, end_row;
1136  RangePtr range;
1137  size_t range_count = 0;
1138 
1139  if (*table.id)
1140  HT_INFOF("compacting table ID=%s ROW=%s FLAGS=%s", table.id, row,
1142  else
1143  HT_INFOF("compacting ranges FLAGS=%s",
1145 
1146  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
1147  return;
1148 
1149  int compaction_type = MaintenanceFlag::COMPACT_MAJOR;
1152  compaction_type = MaintenanceFlag::COMPACT_MINOR;
1155  compaction_type = MaintenanceFlag::COMPACT_MERGING;
1156  else if ((flags & Lib::RangeServer::Protocol::COMPACT_FLAG_GC) ==
1158  compaction_type = MaintenanceFlag::COMPACT_GC;
1159 
1160  HT_INFOF("compaction type = 0x%x", compaction_type);
1161 
1162  try {
1163 
1164  if (*table.id) {
1165 
1166  if (!m_context->live_map->lookup(table.id, table_info)) {
1167  cb->error(Error::TABLE_NOT_FOUND, table.id);
1168  return;
1169  }
1170 
1171  if (*row) {
1172  if (!table_info->find_containing_range(row, range, start_row, end_row)) {
1174  format("Unable to find range for row '%s'", row));
1175  return;
1176  }
1177  range->set_compaction_type_needed(compaction_type);
1178  range_count = 1;
1179  }
1180  else {
1181  ranges.array.clear();
1182  table_info->get_ranges(ranges);
1183  for (auto &rd : ranges.array)
1184  rd.range->set_compaction_type_needed(compaction_type);
1185  range_count = ranges.array.size();
1186  }
1187  }
1188  else {
1189  std::vector<TableInfoPtr> tables;
1190 
1191  m_context->live_map->get_all(tables);
1192 
1193  for (size_t i=0; i<tables.size(); i++) {
1194 
1195  if (tables[i]->identifier().is_metadata()) {
1196 
1199  ranges.array.clear();
1200  tables[i]->get_ranges(ranges);
1201  for (auto &rd : ranges.array)
1202  rd.range->set_compaction_type_needed(compaction_type);
1203  range_count += ranges.array.size();
1204  }
1205  else if ((flags & Lib::RangeServer::Protocol::COMPACT_FLAG_ROOT) ==
1206  Lib::RangeServer::Protocol::COMPACT_FLAG_ROOT) {
1207  ranges.array.clear();
1208  tables[i]->get_ranges(ranges);
1209  for (auto &rd : ranges.array) {
1210  if (rd.range->is_root()) {
1211  rd.range->set_compaction_type_needed(compaction_type);
1212  range_count++;
1213  break;
1214  }
1215  }
1216  }
1217  }
1218  else if (tables[i]->identifier().is_system()) {
1219  if ((flags & Lib::RangeServer::Protocol::COMPACT_FLAG_SYSTEM) == Lib::RangeServer::Protocol::COMPACT_FLAG_SYSTEM) {
1220  ranges.array.clear();
1221  tables[i]->get_ranges(ranges);
1222  for (auto &rd : ranges.array)
1223  rd.range->set_compaction_type_needed(compaction_type);
1224  range_count += ranges.array.size();
1225  }
1226  }
1227  else {
1228  if ((flags & Lib::RangeServer::Protocol::COMPACT_FLAG_USER) == Lib::RangeServer::Protocol::COMPACT_FLAG_USER) {
1229  ranges.array.clear();
1230  tables[i]->get_ranges(ranges);
1231  for (auto &rd : ranges.array)
1232  rd.range->set_compaction_type_needed(compaction_type);
1233  range_count += ranges.array.size();
1234  }
1235  }
1236  }
1237  }
1238 
1239  HT_INFOF("Compaction scheduled for %d ranges", (int)range_count);
1240 
1241  cb->response_ok();
1242 
1243  }
1244  catch (Hypertable::Exception &e) {
1245  int error;
1246  HT_ERROR_OUT << e << HT_END;
1247  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK) {
1248  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
1249  }
1250  }
1251 }
1252 
1253 namespace {
1254 
1255  void do_metadata_sync(RangeData &rd, TableMutatorPtr &mutator,
1256  const char *table_id, bool do_start_row, bool do_location) {
1257  String metadata_key_str;
1258  String start_row, end_row;
1259  KeySpec key;
1260 
1261  rd.range->get_boundary_rows(start_row, end_row);
1262 
1263  metadata_key_str = String(table_id) + ":" + end_row;
1264  key.row = metadata_key_str.c_str();
1265  key.row_len = metadata_key_str.length();
1266  key.column_qualifier = 0;
1267  key.column_qualifier_len = 0;
1268 
1269  if (do_start_row) {
1270  key.column_family = "StartRow";
1271  mutator->set(key, start_row);
1272  }
1273  if (do_location) {
1274  key.column_family = "Location";
1275  mutator->set(key, Global::location_initializer->get());
1276  }
1277 
1278  }
1279 
1280  void do_metadata_sync(Ranges &ranges, TableMutatorPtr &mutator,
1281  const char *table_id, bool do_start_row, bool do_location) {
1282  for (auto &rd : ranges.array)
1283  do_metadata_sync(rd, mutator, table_id, do_start_row, do_location);
1284  }
1285 
1286 }
1287 
1288 void
1290  uint32_t flags, std::vector<const char *> columns) {
1291  Ranges ranges;
1292  TableInfoPtr table_info;
1293  size_t range_count = 0;
1294  TableMutatorPtr mutator;
1295  bool do_start_row = true;
1296  bool do_location = true;
1297  String columns_str;
1298 
1299  if (!columns.empty()) {
1300  columns_str = String("COLUMNS=") + columns[0];
1301  for (size_t i=1; i<columns.size(); i++)
1302  columns_str += String(",") + columns[i];
1303  }
1304 
1305  if (*table_id)
1306  HT_INFOF("metadata sync table ID=%s %s", table_id, columns_str.c_str());
1307  else
1308  HT_INFOF("metadata sync ranges FLAGS=%s %s",
1310  columns_str.c_str());
1311 
1312  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
1313  return;
1314 
1315  if (!Global::metadata_table) {
1316  lock_guard<mutex> lock(Global::mutex);
1317  // double-check locking (works fine on x86 and amd64 but may fail
1318  // on other archs without using a memory barrier
1319  if (!Global::metadata_table) {
1320  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
1321  if (!Global::range_locator)
1322  Global::range_locator = make_shared<Hypertable::RangeLocator>(m_props,
1323  m_conn_manager, Global::hyperspace, timeout_ms);
1325  Global::metadata_table = make_shared<Table>(m_props, Global::range_locator,
1326  m_conn_manager, Global::hyperspace, aq,
1327  m_namemap, TableIdentifier::METADATA_NAME, 0, timeout_ms);
1328  }
1329  }
1330 
1331  if (!columns.empty()) {
1332  do_start_row = do_location = false;
1333  for (size_t i=0; i<columns.size(); i++) {
1334  if (!strcmp(columns[i], "StartRow"))
1335  do_start_row = true;
1336  else if (!strcmp(columns[i], "Location"))
1337  do_location = true;
1338  else
1339  HT_WARNF("Unsupported METADATA column: %s", columns[i]);
1340  }
1341  }
1342 
1343  try {
1344 
1345  if (*table_id) {
1346 
1347  if (!m_context->live_map->lookup(table_id, table_info)) {
1348  cb->error(Error::TABLE_NOT_FOUND, table_id);
1349  return;
1350  }
1351 
1352  mutator.reset( Global::metadata_table->create_mutator() );
1353 
1354  ranges.array.clear();
1355  table_info->get_ranges(ranges);
1356 
1357  do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1358  range_count = ranges.array.size();
1359 
1360  }
1361  else {
1362  std::vector<TableInfoPtr> tables;
1363 
1364  m_context->live_map->get_all(tables);
1365 
1366  mutator.reset( Global::metadata_table->create_mutator() );
1367 
1368  for (size_t i=0; i<tables.size(); i++) {
1369 
1370  if (tables[i]->identifier().is_metadata()) {
1371 
1372  ranges.array.clear();
1373  tables[i]->get_ranges(ranges);
1374 
1375  if (!ranges.array.empty()) {
1376  if (ranges.array[0].range->is_root() &&
1378  do_metadata_sync(ranges.array[0], mutator, table_id, do_start_row, do_location);
1379  range_count++;
1380  }
1382  Lib::RangeServer::Protocol::COMPACT_FLAG_METADATA) {
1383  do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1384  range_count += ranges.array.size();
1385  }
1386  }
1387  }
1388  else if (tables[i]->identifier().is_system()) {
1390  Lib::RangeServer::Protocol::COMPACT_FLAG_SYSTEM) {
1391  ranges.array.clear();
1392  tables[i]->get_ranges(ranges);
1393  do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1394  range_count += ranges.array.size();
1395  }
1396  }
1397  else if (tables[i]->identifier().is_user()) {
1399  Lib::RangeServer::Protocol::COMPACT_FLAG_USER) {
1400  ranges.array.clear();
1401  tables[i]->get_ranges(ranges);
1402  do_metadata_sync(ranges, mutator, table_id, do_start_row, do_location);
1403  range_count += ranges.array.size();
1404  }
1405  }
1406  }
1407  }
1408 
1409  if (range_count)
1410  mutator->flush();
1411 
1412  HT_INFOF("METADATA sync'd for %d ranges", (int)range_count);
1413 
1414  cb->response_ok();
1415 
1416  }
1417  catch (Hypertable::Exception &e) {
1418  int error;
1419  HT_ERROR_OUT << e << HT_END;
1420  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK) {
1421  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
1422  }
1423  }
1424 }
1425 
1426 void
1428  const TableIdentifier &table, const RangeSpec &range_spec,
1429  const ScanSpec &scan_spec, QueryCache::Key *cache_key) {
1430  int error = Error::OK;
1431  String errmsg;
1432  TableInfoPtr table_info;
1433  RangePtr range;
1434  MergeScannerRangePtr scanner;
1435  bool more = true;
1436  uint32_t id = 0;
1437  SchemaPtr schema;
1438  ScanContextPtr scan_ctx;
1439  ProfileDataScanner profile_data;
1440  bool decrement_needed=false;
1441 
1442  //HT_DEBUG_OUT <<"Creating scanner:\n"<< *table << *range_spec
1443  //<< *scan_spec << HT_END;
1444 
1445  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table, range_spec))
1446  return;
1447 
1448  try {
1449  DynamicBuffer rbuf;
1450 
1451  HT_MAYBE_FAIL("create-scanner-1");
1452  HT_MAYBE_FAIL_X("create-scanner-user-1", !table.is_system());
1453  if (scan_spec.row_intervals.size() > 0) {
1454  if (scan_spec.row_intervals.size() > 1 && !scan_spec.scan_and_filter_rows)
1456  "can only scan one row interval");
1457  if (scan_spec.cell_intervals.size() > 0)
1459  "both row and cell intervals defined");
1460  }
1461 
1462  if (scan_spec.cell_intervals.size() > 1)
1464  "can only scan one cell interval");
1465 
1466  if (!m_context->live_map->lookup(table.id, table_info))
1468 
1469  if (!table_info->get_range(range_spec, range))
1471  table.id, range_spec.start_row, range_spec.end_row);
1472 
1473  schema = table_info->get_schema();
1474 
1475  // verify schema
1476  if (schema->get_generation() != table.generation) {
1478  "RangeServer Schema generation for table '%s'"
1479  " is %lld but supplied is %lld",
1480  table.id, (Lld)schema->get_generation(),
1481  (Lld)table.generation);
1482  }
1483 
1484  range->deferred_initialization(cb->event()->header.timeout_ms);
1485 
1486  if (!range->increment_scan_counter())
1488  "Range %s[%s..%s] dropped or relinquished",
1489  table.id, range_spec.start_row, range_spec.end_row);
1490 
1491  decrement_needed = true;
1492 
1493  String start_row, end_row;
1494  range->get_boundary_rows(start_row, end_row);
1495 
1496  // Check to see if range just shrunk
1497  if (strcmp(start_row.c_str(), range_spec.start_row) ||
1498  strcmp(end_row.c_str(), range_spec.end_row))
1500  table.id, range_spec.start_row, range_spec.end_row);
1501 
1502  // check query cache
1503  if (cache_key && m_query_cache && !table.is_metadata()) {
1504  boost::shared_array<uint8_t> ext_buffer;
1505  uint32_t ext_len;
1506  uint32_t cell_count;
1507  if (m_query_cache->lookup(cache_key, ext_buffer, &ext_len, &cell_count)) {
1508  if ((error = cb->response(id, 0, 0, false, profile_data, ext_buffer, ext_len))
1509  != Error::OK)
1510  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
1511  range->decrement_scan_counter();
1512  lock_guard<LoadStatistics> lock(*Global::load_statistics);
1513  Global::load_statistics->add_cached_scan_data(1, cell_count, ext_len);
1514  return;
1515  }
1516  }
1517  std::set<uint8_t> columns;
1518  scan_ctx = make_shared<ScanContext>(range->get_scan_revision(cb->event()->header.timeout_ms),
1519  &scan_spec, &range_spec, schema, &columns);
1520  scan_ctx->timeout_ms = cb->event()->header.timeout_ms;
1521 
1522  range->create_scanner(scan_ctx, scanner);
1523 
1524  range->decrement_scan_counter();
1525  decrement_needed = false;
1526 
1527  uint32_t cell_count {};
1528 
1529  more = FillScanBlock(scanner, rbuf, &cell_count, m_scanner_buffer_size);
1530 
1531  profile_data.cells_scanned = scanner->get_input_cells();
1532  profile_data.cells_returned = scanner->get_output_cells();
1533  profile_data.bytes_scanned = scanner->get_input_bytes();
1534  profile_data.bytes_returned = scanner->get_output_bytes();
1535  profile_data.disk_read = scanner->get_disk_read();
1536 
1537  int64_t output_cells = scanner->get_output_cells();
1538 
1539  {
1540  lock_guard<LoadStatistics> lock(*Global::load_statistics);
1541  Global::load_statistics->add_scan_data(1,
1542  profile_data.cells_scanned,
1543  profile_data.cells_returned,
1544  profile_data.bytes_scanned,
1545  profile_data.bytes_returned);
1546  range->add_read_data(profile_data.cells_scanned,
1547  profile_data.cells_returned,
1548  profile_data.bytes_scanned,
1549  profile_data.bytes_returned,
1550  profile_data.disk_read);
1551  }
1552 
1553  int skipped_rows = scanner->get_skipped_rows();
1554  int skipped_cells = scanner->get_skipped_cells();
1555 
1556  if (more) {
1557  scan_ctx->deep_copy_specs();
1558  id = m_scanner_map.put(scanner, range, table, profile_data);
1559  }
1560  else {
1561  id = 0;
1562  scanner.reset();
1563  }
1564 
1565  //HT_INFOF("scanner=%d cell_count=%d %s", (int)id, (int)cell_count, profile_data.to_string().c_str());
1566 
1567  if (table.is_metadata())
1568  HT_INFOF("Successfully created scanner (id=%u) on table '%s', returning "
1569  "%lld k/v pairs, more=%lld", id, table.id,
1570  (Lld)output_cells, (Lld) more);
1571 
1575  if (cache_key && m_query_cache && !table.is_metadata() && !more) {
1576  const char *cache_row_key = scan_spec.cache_key();
1577  char *row_key_ptr, *tablename_ptr;
1578  uint8_t *buffer = new uint8_t [ rbuf.fill() + strlen(cache_row_key) + strlen(table.id) + 2 ];
1579  memcpy(buffer, rbuf.base, rbuf.fill());
1580  row_key_ptr = (char *)buffer + rbuf.fill();
1581  strcpy(row_key_ptr, cache_row_key);
1582  tablename_ptr = row_key_ptr + strlen(row_key_ptr) + 1;
1583  strcpy(tablename_ptr, table.id);
1584  boost::shared_array<uint8_t> ext_buffer(buffer);
1585  m_query_cache->insert(cache_key, tablename_ptr, row_key_ptr,
1586  columns, cell_count, ext_buffer, rbuf.fill());
1587  if ((error = cb->response(id, skipped_rows, skipped_cells, false,
1588  profile_data, ext_buffer, rbuf.fill())) != Error::OK) {
1589  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
1590  }
1591  }
1592  else {
1593  StaticBuffer ext(rbuf);
1594  if ((error = cb->response(id, skipped_rows, skipped_cells, more,
1595  profile_data, ext)) != Error::OK) {
1596  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
1597  }
1598  }
1599 
1600  }
1601  catch (Hypertable::Exception &e) {
1602  int error;
1603  if (decrement_needed)
1604  range->decrement_scan_counter();
1607  HT_INFOF("%s - %s", Error::get_text(e.code()), e.what());
1608  else
1609  HT_ERROR_OUT << e << HT_END;
1610  if ((error = cb->error(e.code(), e.what())) != Error::OK)
1611  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
1612  }
1613 }
1614 
1615 void
1617  HT_DEBUGF("destroying scanner id=%u", scanner_id);
1618  m_scanner_map.remove(scanner_id);
1619  cb->response_ok();
1620 }
1621 
1622 void
1624  int32_t scanner_id) {
1625  String errmsg;
1626  int error = Error::OK;
1627  MergeScannerRangePtr scanner;
1628  RangePtr range;
1629  bool more = true;
1630  DynamicBuffer rbuf;
1631  TableInfoPtr table_info;
1632  TableIdentifierManaged scanner_table;
1633  SchemaPtr schema;
1634  ProfileDataScanner profile_data_before;
1635  ProfileDataScanner profile_data;
1636 
1637  HT_DEBUG_OUT <<"Scanner ID = " << scanner_id << HT_END;
1638 
1639  try {
1640 
1641  if (!m_scanner_map.get(scanner_id, scanner, range, scanner_table, &profile_data_before))
1643  format("scanner ID %d", scanner_id));
1644 
1645  HT_MAYBE_FAIL_X("fetch-scanblock-user-1", !scanner_table.is_system());
1646 
1647  if (!m_context->live_map->lookup(scanner_table.id, table_info))
1648  HT_THROWF(Error::TABLE_NOT_FOUND, "%s", scanner_table.id);
1649 
1650  schema = table_info->get_schema();
1651 
1652  // verify schema
1653  if (schema->get_generation() != scanner_table.generation) {
1654  m_scanner_map.remove(scanner_id);
1656  "RangeServer Schema generation for table '%s' is %lld but "
1657  "scanner has generation %lld", scanner_table.id,
1658  (Lld)schema->get_generation(), (Lld)scanner_table.generation);
1659  }
1660 
1661  uint32_t cell_count {};
1662 
1663  more = FillScanBlock(scanner, rbuf, &cell_count, m_scanner_buffer_size);
1664 
1665  profile_data.cells_scanned = scanner->get_input_cells();
1666  profile_data.cells_returned = scanner->get_output_cells();
1667  profile_data.bytes_scanned = scanner->get_input_bytes();
1668  profile_data.bytes_returned = scanner->get_output_bytes();
1669  profile_data.disk_read = scanner->get_disk_read();
1670 
1671  int64_t output_cells = scanner->get_output_cells();
1672 
1673  if (!more) {
1674  m_scanner_map.remove(scanner_id);
1675  scanner.reset();
1676  }
1677  else
1678  m_scanner_map.update_profile_data(scanner_id, profile_data);
1679 
1680  profile_data -= profile_data_before;
1681 
1682  //HT_INFOF("scanner=%d cell_count=%d %s", (int)scanner_id, (int)cell_count, profile_data.to_string().c_str());
1683 
1684  {
1685  lock_guard<LoadStatistics> lock(*Global::load_statistics);
1686  Global::load_statistics->add_scan_data(0,
1687  profile_data.cells_scanned,
1688  profile_data.cells_returned,
1689  profile_data.bytes_scanned,
1690  profile_data.bytes_returned);
1691  range->add_read_data(profile_data.cells_scanned,
1692  profile_data.cells_returned,
1693  profile_data.bytes_scanned,
1694  profile_data.bytes_returned,
1695  profile_data.disk_read);
1696  }
1697 
1701  {
1702  StaticBuffer ext(rbuf);
1703  error = cb->response(scanner_id, 0, 0, more, profile_data, ext);
1704  if (error != Error::OK)
1705  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
1706 
1707  HT_DEBUGF("Successfully fetched %u bytes (%lld k/v pairs) of scan data",
1708  ext.size-4, (Lld)output_cells);
1709  }
1710 
1711  }
1712  catch (Hypertable::Exception &e) {
1713  HT_ERROR_OUT << e << HT_END;
1714  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK)
1715  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
1716  }
1717 }
1718 
1719 void
1721  const RangeSpec &range_spec,
1722  const RangeState &range_state,
1723  bool needs_compaction) {
1724  int error = Error::OK;
1725  TableMutatorPtr mutator;
1726  KeySpec key;
1727  String metadata_key_str;
1728  String errmsg;
1729  SchemaPtr schema;
1730  TableInfoPtr table_info;
1731  RangePtr range;
1732  String table_dfsdir;
1733  String range_dfsdir;
1734  char md5DigestStr[33];
1735  String location;
1736  bool is_root;
1737  bool is_staged = false;
1738 
1739  try {
1740 
1741  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table, range_spec))
1742  return;
1743 
1744  is_root = table.is_metadata() && (*range_spec.start_row == 0)
1745  && !strcmp(range_spec.end_row, Key::END_ROOT_ROW);
1746 
1747  std::stringstream sout;
1748  sout << "Loading range: "<< table <<" "<< range_spec << " " << range_state
1749  << " needs_compaction=" << boolalpha << needs_compaction;
1750  HT_INFOF("%s", sout.str().c_str());
1751 
1752  HT_MAYBE_FAIL_X("load-range-1", !table.is_system());
1753 
1754  m_context->live_map->get(table.id, table_info);
1755 
1756  // If maintenance has been disabled for the table, tell the maintenance
1757  // scheduler to not schedule maintenance for it
1758  if (table_info->maintenance_disabled())
1759  m_maintenance_scheduler->exclude(table);
1760 
1761  uint32_t generation = table_info->get_schema()->get_generation();
1762  if (generation > table.generation) {
1763  HT_WARNF("Table generation mismatch in load range request (%d < %d),"
1764  " automatically upgrading", (int)table.generation, (int)generation);
1765  ((TableIdentifier *)&table)->generation = generation;
1766  }
1767 
1768  table_info->stage_range(range_spec, cb->event()->deadline());
1769 
1770  is_staged = true;
1771 
1772  // Lazily create sys/METADATA table pointer
1773  if (!Global::metadata_table) {
1774  lock_guard<mutex> lock(Global::mutex);
1775  // double-check locking (works fine on x86 and amd64 but may fail
1776  // on other archs without using a memory barrier
1777  if (!Global::metadata_table) {
1778  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
1779  if (!Global::range_locator)
1780  Global::range_locator = make_shared<Hypertable::RangeLocator>(m_props,
1781  m_conn_manager, Global::hyperspace, timeout_ms);
1783  Global::metadata_table = make_shared<Table>(m_props, Global::range_locator,
1784  m_conn_manager, Global::hyperspace, aq,
1785  m_namemap, TableIdentifier::METADATA_NAME, 0, timeout_ms);
1786  }
1787  }
1788 
1792  {
1793  lock_guard<mutex> lock(m_pending_metrics_mutex);
1794  Cell cell;
1795 
1796  if (m_pending_metrics_updates == 0)
1797  m_pending_metrics_updates = new CellsBuilder();
1798 
1799  String row = Global::location_initializer->get() + ":" + table.id;
1800  cell.row_key = row.c_str();
1801  cell.column_family = "range_start_row";
1802  cell.column_qualifier = range_spec.end_row;
1803  cell.value = (const uint8_t *)range_spec.start_row;
1804  cell.value_len = strlen(range_spec.start_row);
1805 
1806  m_pending_metrics_updates->add(cell);
1807  }
1808 
1809  schema = table_info->get_schema();
1810 
1815  {
1816  assert(*range_spec.end_row != 0);
1817  md5_trunc_modified_base64(range_spec.end_row, md5DigestStr);
1818  md5DigestStr[16] = 0;
1819  table_dfsdir = Global::toplevel_dir + "/tables/" + table.id;
1820 
1821  for (auto ag_spec : schema->get_access_groups()) {
1822  // notice the below variables are different "range" vs. "table"
1823  range_dfsdir = table_dfsdir + "/" + ag_spec->get_name() + "/" + md5DigestStr;
1824  Global::dfs->mkdirs(range_dfsdir);
1825  }
1826  }
1827 
1828  HT_MAYBE_FAIL_X("metadata-load-range-1", table.is_metadata());
1829 
1830  range = make_shared<Range>(m_master_client, table, schema, range_spec,
1831  table_info.get(), range_state, needs_compaction);
1832 
1833  HT_MAYBE_FAIL_X("metadata-load-range-2", table.is_metadata());
1834 
1835  // Create ROOT, METADATA, or SYSTEM log if necessary
1836  if (!table.is_user()) {
1837  lock_guard<mutex> lock(Global::mutex);
1838  if (table.is_metadata()) {
1839  if (is_root) {
1840  Global::log_dfs->mkdirs(Global::log_dir + "/root");
1841  Global::root_log = make_shared<CommitLog>(Global::log_dfs, Global::log_dir
1842  + "/root", m_props);
1843  }
1844  if (Global::metadata_log == 0) {
1845  Global::log_dfs->mkdirs(Global::log_dir + "/metadata");
1846  Global::metadata_log = make_shared<CommitLog>(Global::log_dfs,
1847  Global::log_dir + "/metadata", m_props);
1848  m_update_pipeline_metadata =
1849  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
1850  Global::metadata_log, m_log_flush_method_meta);
1851  }
1852  }
1853  else if (table.is_system() && Global::system_log == 0) {
1854  Global::log_dfs->mkdirs(Global::log_dir + "/system");
1855  Global::system_log = make_shared<CommitLog>(Global::log_dfs,
1856  Global::log_dir + "/system", m_props);
1857  m_update_pipeline_system =
1858  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
1859  Global::system_log, m_log_flush_method_user);
1860  }
1861  }
1862 
1869  if (!is_root) {
1870  metadata_key_str = format("%s:%s", table.id, range_spec.end_row);
1871 
1875  mutator.reset(Global::metadata_table->create_mutator());
1876 
1877  key.row = metadata_key_str.c_str();
1878  key.row_len = strlen(metadata_key_str.c_str());
1879  key.column_family = "Location";
1880  key.column_qualifier = 0;
1881  key.column_qualifier_len = 0;
1882  location = Global::location_initializer->get();
1883  mutator->set(key, location.c_str(), location.length());
1884  mutator->flush();
1885  }
1886  else { //root
1887  uint32_t oflags = OPEN_FLAG_READ | OPEN_FLAG_WRITE | OPEN_FLAG_CREATE;
1888 
1889  HT_INFO("Loading root METADATA range");
1890 
1891  try {
1892  location = Global::location_initializer->get();
1893  m_hyperspace->attr_set(Global::toplevel_dir + "/root", oflags,
1894  "Location", location.c_str(), location.length());
1895  }
1896  catch (Exception &e) {
1897  HT_ERROR_OUT << "Problem setting attribute 'location' on Hyperspace "
1898  "file '" << Global::toplevel_dir << "/root'" << HT_END;
1899  HT_ERROR_OUT << e << HT_END;
1900  HT_ABORT;
1901  }
1902  }
1903 
1904  HT_MAYBE_FAIL_X("metadata-load-range-3", table.is_metadata());
1905 
1906  // make sure that we don't have a clock skew
1907  // timeout is in milliseconds, revision and now is in nanoseconds
1908  int64_t now = Hypertable::get_ts64();
1909  int64_t revision = range->get_scan_revision(cb->event()->header.timeout_ms);
1910  if (revision > now) {
1911  int64_t diff = (revision - now) / 1000000;
1912  HT_WARNF("Clock skew detected when loading range; waiting for %lld "
1913  "millisec", (long long int)diff);
1914  this_thread::sleep_for(chrono::milliseconds(diff));
1915  }
1916 
1917  m_context->live_map->promote_staged_range(table, range, range_state.transfer_log);
1918 
1919  HT_MAYBE_FAIL_X("user-load-range-4", !table.is_system());
1920  HT_MAYBE_FAIL_X("metadata-load-range-4", table.is_metadata());
1921 
1922  if (cb && (error = cb->response_ok()) != Error::OK)
1923  HT_ERRORF("Problem sending OK response - %s", Error::get_text(error));
1924  else
1925  HT_INFOF("Successfully loaded range %s[%s..%s]", table.id,
1926  range_spec.start_row, range_spec.end_row);
1927  }
1928  catch (Hypertable::Exception &e) {
1931  HT_ERROR_OUT << e << HT_END;
1932  if (is_staged)
1933  table_info->unstage_range(range_spec);
1934  }
1935  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK)
1936  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
1937  }
1938 }
1939 
1940 void
1942  const vector<QualifiedRangeSpec> &specs) {
1943  TableInfoPtr table_info;
1944  RangePtr range;
1945  map<QualifiedRangeSpec, int> error_map;
1946 
1947  for (const auto &rr : specs) {
1948 
1949  if (!m_log_replay_barrier->wait(cb->event()->deadline(),
1950  rr.table, rr.range))
1951  return;
1952 
1953  HT_INFOF("Acknowledging range: %s[%s..%s]", rr.table.id,
1954  rr.range.start_row, rr.range.end_row);
1955 
1956  if (!m_context->live_map->lookup(rr.table.id, table_info)) {
1957  error_map[rr] = Error::TABLE_NOT_FOUND;
1958  HT_WARN_OUT << "Table " << rr.table.id << " not found" << HT_END;
1959  continue;
1960  }
1961 
1962  if (!table_info->get_range(rr.range, range)) {
1963  error_map[rr] = Error::RANGESERVER_RANGE_NOT_FOUND;
1964  HT_WARN_OUT << "Range " << rr << " not found" << HT_END;
1965  continue;
1966  }
1967 
1968  if (range->load_acknowledged()) {
1969  error_map[rr] = Error::OK;
1970  HT_WARN_OUT << "Range: " << rr << " already acknowledged" << HT_END;
1971  continue;
1972  }
1973 
1974  try {
1975  range->acknowledge_load(cb->event()->header.timeout_ms);
1976  }
1977  catch(Exception &e) {
1978  error_map[rr] = e.code();
1979  HT_ERROR_OUT << e << HT_END;
1980  continue;
1981  }
1982 
1983  HT_MAYBE_FAIL_X("metadata-acknowledge-load", rr.table.is_metadata());
1984 
1985  error_map[rr] = Error::OK;
1986  std::stringstream sout;
1987  sout << "Range: " << rr <<" acknowledged";
1988  HT_INFOF("%s", sout.str().c_str());
1989  }
1990 
1991  cb->response(error_map);
1992 }
1993 
1994 void
1996  const TableIdentifier &table, const char *schema_str) {
1997 
1998  HT_INFOF("Updating schema for: %s schema = %s", table.id, schema_str);
1999 
2000  try {
2001  SchemaPtr schema( Schema::new_instance(schema_str) );
2002  TableInfoPtr table_info;
2003  if (m_context->live_map->lookup(table.id, table_info))
2004  table_info->update_schema(schema);
2005  }
2006  catch(Exception &e) {
2007  HT_ERROR_OUT << e << HT_END;
2008  cb->error(e.code(), e.what());
2009  return;
2010  }
2011 
2012  HT_INFOF("Successfully updated schema for: %s", table.id);
2013  cb->response_ok();
2014 }
2015 
2016 
2017 void
2019  uint64_t cluster_id,
2020  const TableIdentifier &table) {
2021  String errmsg;
2022  int error = Error::OK;
2023  UpdateRecTable *table_update = new UpdateRecTable();
2024  StaticBuffer buffer(0);
2025  std::vector<UpdateRecTable *> table_update_vector;
2026 
2027  HT_DEBUG_OUT <<"received commit_log_sync request for table "<< table.id<< HT_END;
2028 
2029  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
2030  return;
2031 
2032  try {
2033 
2034  if (!m_context->live_map->lookup(table.id, table_update->table_info)) {
2035  if ((error = cb->error(Error::TABLE_NOT_FOUND, table.id)) != Error::OK)
2036  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
2037  return;
2038  }
2039 
2040  // If USER table, check for group commit interval
2041  if (table.is_user()) {
2042  SchemaPtr schema = table_update->table_info->get_schema();
2043  // Check for group commit
2044  if (schema->get_group_commit_interval() > 0) {
2045  group_commit_add(cb->event(), cluster_id, schema, table, 0, buffer, 0);
2046  return;
2047  }
2048  }
2049 
2050  // normal sync...
2051  UpdateRequest *request = new UpdateRequest();
2052  table_update->id = table;
2053  table_update->commit_interval = 0;
2054  table_update->total_count = 0;
2055  table_update->total_buffer_size = 0;;
2056  table_update->flags = 0;
2057  request->buffer = buffer;
2058  request->count = 0;
2059  request->event = cb->event();
2060  table_update->requests.push_back(request);
2061 
2062  table_update_vector.push_back(table_update);
2063 
2064  UpdateContext *uc = new UpdateContext(table_update_vector, cb->event()->deadline());
2065 
2066  if (table.is_user())
2067  m_update_pipeline_user->add(uc);
2068  else if (table.is_metadata())
2069  m_update_pipeline_metadata->add(uc);
2070  else {
2071  HT_ASSERT(table.is_system());
2072  m_update_pipeline_system->add(uc);
2073  }
2074 
2075  }
2076  catch (Exception &e) {
2077  HT_ERROR_OUT << "Exception caught: " << e << HT_END;
2078  error = e.code();
2079  errmsg = e.what();
2080  if ((error = cb->error(error, errmsg)) != Error::OK)
2081  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
2082  }
2083 }
2084 
2090 void
2092  const TableIdentifier &table, uint32_t count,
2093  StaticBuffer &buffer, uint32_t flags) {
2094  SchemaPtr schema;
2095  UpdateRecTable *table_update = new UpdateRecTable();
2096 
2097  HT_DEBUG_OUT <<"Update: "<< table << HT_END;
2098 
2099  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table))
2100  return;
2101 
2102  if (!m_context->live_map->lookup(table.id, table_update->table_info))
2104 
2105  schema = table_update->table_info->get_schema();
2106 
2107  // Check for group commit
2108  if (schema->get_group_commit_interval() > 0) {
2109  group_commit_add(cb->event(), cluster_id, schema, table, count, buffer, flags);
2110  delete table_update;
2111  return;
2112  }
2113 
2114  // normal update ...
2115 
2116  std::vector<UpdateRecTable *> table_update_vector;
2117 
2118  table_update->id = table;
2119  table_update->total_count = count;
2120  table_update->total_buffer_size = buffer.size;
2121  table_update->flags = flags;
2122  table_update->expire_time = cb->event()->deadline();
2123 
2124  UpdateRequest *request = new UpdateRequest();
2125  request->buffer = buffer;
2126  request->count = count;
2127  request->event = cb->event();
2128 
2129  table_update->requests.push_back(request);
2130 
2131  table_update_vector.push_back(table_update);
2132 
2133  UpdateContext *uc = new UpdateContext(table_update_vector, table_update->expire_time);
2134 
2135  if (table.is_user())
2136  m_update_pipeline_user->add(uc);
2137  else if (table.is_metadata())
2138  m_update_pipeline_metadata->add(uc);
2139  else {
2140  HT_ASSERT(table.is_system());
2141  m_update_pipeline_system->add(uc);
2142  }
2143 
2144 }
2145 
2146 void
2147 Apps::RangeServer::batch_update(std::vector<UpdateRecTable *> &updates,
2148  ClockT::time_point expire_time) {
2149  UpdateContext *uc = new UpdateContext(updates, expire_time);
2150  m_update_pipeline_user->add(uc);
2151 }
2152 
2153 
2154 void
2156  TableInfoPtr table_info;
2157  Ranges ranges;
2158  String metadata_prefix;
2159  String metadata_key;
2160  TableMutatorPtr mutator;
2161 
2162  HT_INFOF("drop table %s", table.id);
2163 
2164  if (table.is_system()) {
2165  cb->error(Error::NOT_ALLOWED, "system tables cannot be dropped");
2166  return;
2167  }
2168 
2169  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
2170  return;
2171 
2172  if (!m_context->live_map->remove(table.id, table_info)) {
2173  HT_WARNF("drop_table '%s' - table not found", table.id);
2174  cb->error(Error::TABLE_NOT_FOUND, table.id);
2175  return;
2176  }
2177 
2178  // Set "drop" bit on all ranges
2179  ranges.array.clear();
2180  table_info->get_ranges(ranges);
2181  for (auto &rd : ranges.array)
2182  rd.range->drop();
2183 
2184  // Disable maintenance for range and remove the range from the RSML
2185  for (auto &rd : ranges.array) {
2186  rd.range->disable_maintenance();
2187  try {
2188  MetaLogEntityRangePtr entity = rd.range->metalog_entity();
2189  Global::rsml_writer->record_removal(entity);
2190  }
2191  catch (Exception &e) {
2192  cb->error(e.code(), Global::location_initializer->get());
2193  return;
2194  }
2195  }
2196 
2197  SchemaPtr schema = table_info->get_schema();
2198  AccessGroupSpecs &ag_specs = schema->get_access_groups();
2199 
2200  // create METADATA table mutator for clearing 'Location' columns
2201  mutator.reset(Global::metadata_table->create_mutator());
2202 
2203  KeySpec key;
2204 
2205  try {
2206  // For each range in dropped table, Set the 'drop' bit and clear
2207  // the 'Location' column of the corresponding METADATA entry
2208  metadata_prefix = String("") + table.id + ":";
2209  for (auto &rd : ranges.array) {
2210  // Mark Location column
2211  metadata_key = metadata_prefix + rd.range->end_row();
2212  key.row = metadata_key.c_str();
2213  key.row_len = metadata_key.length();
2214  key.column_family = "Location";
2215  mutator->set(key, "!", 1);
2216  for (size_t j=0; j<ag_specs.size(); j++) {
2217  key.column_family = "Files";
2218  key.column_qualifier = ag_specs[j]->get_name().c_str();
2219  key.column_qualifier_len = ag_specs[j]->get_name().length();
2220  mutator->set(key, (uint8_t *)"!", 1);
2221  }
2222  }
2223  mutator->flush();
2224  }
2225  catch (Hypertable::Exception &e) {
2226  HT_ERROR_OUT << "Problem clearing 'Location' columns of METADATA - " << e << HT_END;
2227  cb->error(e.code(), "Problem clearing 'Location' columns of METADATA");
2228  return;
2229  }
2230 
2231  HT_INFOF("Successfully dropped table '%s'", table.id);
2232 
2233  cb->response_ok();
2234 }
2235 
2236 void Apps::RangeServer::dump(ResponseCallback *cb, const char *outfile,
2237  bool nokeys) {
2238  Ranges ranges;
2240  String str;
2241 
2242  HT_INFO("dump");
2243 
2244  try {
2245  std::ofstream out(outfile);
2246 
2247  m_context->live_map->get_ranges(ranges);
2248  time_t now = time(0);
2249  for (auto &rd : ranges.array) {
2250  rd.data = rd.range->get_maintenance_data(ranges.arena, now, 0);
2251  out << "RANGE " << rd.range->get_name() << "\n";
2252  out << *rd.data << "\n";
2253  for (ag_data = rd.data->agdata; ag_data; ag_data = ag_data->next)
2254  out << *ag_data << "\n";
2255  }
2256 
2257  // dump keys
2258  if (!nokeys) {
2259  for (auto &rd : ranges.array)
2260  for (ag_data = rd.data->agdata; ag_data; ag_data = ag_data->next)
2261  ag_data->ag->dump_keys(out);
2262  }
2263 
2264  // Query Cache
2265  if (m_query_cache)
2266  m_query_cache->dump_keys(out);
2267 
2268  // Dump AccessGroup garbage tracker statistics
2269  out << "\nGarbage tracker statistics:\n";
2270  for (RangeData &rd : ranges.array) {
2271  for (ag_data = rd.data->agdata; ag_data; ag_data = ag_data->next)
2272  ag_data->ag->dump_garbage_tracker_statistics(out);
2273  }
2274 
2275  out << "\nCommit Log Info\n";
2276  str = "";
2277 
2278  if (Global::root_log)
2279  Global::root_log->get_stats("ROOT", str);
2280 
2282  Global::metadata_log->get_stats("METADATA", str);
2283 
2284  if (Global::system_log)
2285  Global::system_log->get_stats("SYSTEM", str);
2286 
2287  if (Global::user_log)
2288  Global::user_log->get_stats("USER", str);
2289 
2290  out << str;
2291 
2292  }
2293  catch (Hypertable::Exception &e) {
2294  HT_ERROR_OUT << e << HT_END;
2295  cb->error(e.code(), "Problem executing dump() command");
2296  return;
2297  }
2298  catch (std::exception &e) {
2299  cb->error(Error::LOCAL_IO_ERROR, e.what());
2300  return;
2301  }
2302  cb->response_ok();
2303 }
2304 
2305 void
2307  const char *pseudo_table, const char *outfile) {
2308 
2309  HT_INFOF("dump_psudo_table ID=%s pseudo-table=%s outfile=%s", table.id, pseudo_table, outfile);
2310 
2311  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
2312  return;
2313 
2314  try {
2315  Ranges ranges;
2316  TableInfoPtr table_info;
2317  CellListScanner *scanner;
2318  ScanContextPtr scan_ctx = make_shared<ScanContext>();
2319  Key key;
2320  ByteString value;
2321  ColumnFamilySpec *cf_spec;
2322  const uint8_t *ptr;
2323  size_t len;
2324 
2325  std::ofstream out(outfile);
2326 
2327  if (!m_context->live_map->lookup(table.id, table_info)) {
2328  cb->error(Error::TABLE_NOT_FOUND, table.id);
2329  return;
2330  }
2331 
2332  scan_ctx->timeout_ms = cb->event()->header.timeout_ms;
2333 
2334  table_info->get_ranges(ranges);
2335  for (auto &rd : ranges.array) {
2336  scanner = rd.range->create_scanner_pseudo_table(scan_ctx, pseudo_table);
2337  while (scanner->get(key, value)) {
2338  cf_spec = Global::pseudo_tables->cellstore_index->get_column_family(key.column_family_code);
2339  if (cf_spec == 0) {
2340  HT_ERRORF("Column family code %d not found in %s pseudo table schema",
2341  key.column_family_code, pseudo_table);
2342  }
2343  else {
2344  out << key.row << "\t" << cf_spec->get_name();
2345  if (key.column_qualifier)
2346  out << ":" << key.column_qualifier;
2347  out << "\t";
2348  ptr = value.ptr;
2349  len = Serialization::decode_vi32(&ptr);
2350  out.write((const char *)ptr, len);
2351  out << "\n";
2352  }
2353  scanner->forward();
2354  }
2355  delete scanner;
2356  }
2357 
2358  out << std::flush;
2359 
2360  cb->response_ok();
2361 
2362  }
2363  catch (Hypertable::Exception &e) {
2364  HT_ERROR_OUT << e << HT_END;
2365  cb->error(e.code(), e.what());
2366  }
2367  catch (std::exception &e) {
2368  cb->error(Error::LOCAL_IO_ERROR, e.what());
2369  }
2370 
2371 }
2372 
2373 void Apps::RangeServer::heapcheck(ResponseCallback *cb, const char *outfile) {
2374 
2375  HT_INFO("heapcheck");
2376 
2377 #if defined(TCMALLOC) || defined(TCMALLOC_MINIMAL)
2378  if (outfile && *outfile) {
2379  std::ofstream out(outfile);
2380  char buf[4096];
2381  MallocExtension::instance()->GetStats(buf, 4096);
2382  out << buf << std::endl;
2383  }
2384 #endif
2385 
2386 #if defined(TCMALLOC)
2387  HeapLeakChecker::NoGlobalLeaks();
2388  if (IsHeapProfilerRunning())
2389  HeapProfilerDump("heapcheck");
2390 #else
2391  HT_WARN("heapcheck not defined for current allocator");
2392 #endif
2393 
2394  cb->response_ok();
2395 }
2396 
2398  const std::vector<SystemVariable::Spec> &specs,
2399  int64_t generation) {
2400  HT_INFOF("generation=%lld %s", (Lld)generation,
2401  SystemVariable::specs_to_string(specs).c_str());
2402  // Update server state
2403  m_context->server_state->set(generation, specs);
2404  cb->response_ok();
2405 }
2406 
2407 void
2409  const TableIdentifier &table) {
2410  HT_INFOF("table_maintenance_enable(\"%s\"", table.id);
2411 
2412  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table))
2413  return;
2414 
2415  TableInfoPtr table_info;
2416  if (m_context->live_map->lookup(table.id, table_info)) {
2417  table_info->set_maintenance_disabled(false);
2418  Ranges ranges;
2419  table_info->get_ranges(ranges);
2420  for (RangeData &rd : ranges.array)
2421  rd.range->enable_maintenance();
2422  }
2423 
2424  m_maintenance_scheduler->include(table);
2425 
2426  cb->response_ok();
2427 }
2428 
2429 
2430 void
2432  const TableIdentifier &table) {
2433  HT_INFOF("table_maintenance_disable(\"%s\"", table.id);
2434 
2435  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table))
2436  return;
2437 
2438  TableInfoPtr table_info;
2439 
2440  if (!m_context->live_map->lookup(table.id, table_info)) {
2441  cb->response_ok();
2442  return;
2443  }
2444 
2445  table_info->set_maintenance_disabled(true);
2446 
2447  m_maintenance_scheduler->exclude(table);
2448 
2449  Ranges ranges;
2450  table_info->get_ranges(ranges);
2451  for (RangeData &rd : ranges.array)
2452  rd.range->disable_maintenance();
2453 
2454  // Clear any cached index tables
2456 
2457  cb->response_ok();
2458 }
2459 
2460 
2461 
2462 void
2464  const std::vector<SystemVariable::Spec> &specs,
2465  uint64_t generation) {
2466 
2467  if (test_and_set_get_statistics_outstanding(true))
2468  return;
2469 
2471 
2472  lock_guard<mutex> lock(m_stats_mutex);
2473  RangesPtr ranges = Global::get_ranges();
2474  int64_t timestamp = Hypertable::get_ts64();
2475  time_t now = (time_t)(timestamp/1000000000LL);
2476  LoadStatistics::Bundle load_stats;
2477 
2478  HT_INFO("Entering get_statistics()");
2479 
2480  if (m_shutdown) {
2482  return;
2483  }
2484 
2485  // Update server state
2486  m_context->server_state->set(generation, specs);
2487 
2488  Global::load_statistics->recompute(&load_stats);
2489  m_stats->system.refresh();
2490 
2491  float period_seconds = (float)load_stats.period_millis / 1000.0;
2492 
2493  uint64_t disk_total = 0;
2494  uint64_t disk_avail = 0;
2495  for (auto &fss : m_stats->system.fs_stat) {
2496  disk_total += fss.total;
2497  disk_avail += fss.avail;
2498  }
2499 
2500  m_loadavg_accum += m_stats->system.loadavg_stat.loadavg[0];
2501  m_page_in_accum += m_stats->system.swap_stat.page_in;
2502  m_page_out_accum += m_stats->system.swap_stat.page_out;
2503  m_load_factors.bytes_scanned += load_stats.bytes_scanned;
2504  m_load_factors.bytes_written += load_stats.update_bytes;
2505  m_metric_samples++;
2506 
2507  m_stats->set_location(Global::location_initializer->get());
2508  m_stats->set_version(version_string());
2509  m_stats->timestamp = timestamp;
2510  m_stats->scan_count = load_stats.scan_count;
2511  m_stats->scanned_cells = load_stats.cells_scanned;
2512  m_stats->scanned_bytes = load_stats.bytes_scanned;
2513  m_stats->update_count = load_stats.update_count;
2514  m_stats->updated_cells = load_stats.update_cells;
2515  m_stats->updated_bytes = load_stats.update_bytes;
2516  m_stats->sync_count = load_stats.sync_count;
2517  m_stats->tracked_memory = Global::memory_tracker->balance();
2518  m_stats->cpu_user = m_stats->system.cpu_stat.user;
2519  m_stats->cpu_sys = m_stats->system.cpu_stat.sys;
2520  m_stats->live = m_log_replay_barrier->user_complete();
2521 
2522  uint64_t previous_query_cache_accesses = m_stats->query_cache_accesses;
2523  uint64_t previous_query_cache_hits = m_stats->query_cache_hits;
2524  uint64_t previous_block_cache_accesses = m_stats->block_cache_accesses;
2525  uint64_t previous_block_cache_hits = m_stats->block_cache_hits;
2526  int32_t query_cache_waiters {};
2527 
2528  if (m_query_cache)
2529  m_query_cache->get_stats(&m_stats->query_cache_max_memory,
2530  &m_stats->query_cache_available_memory,
2531  &m_stats->query_cache_accesses,
2532  &m_stats->query_cache_hits,
2533  &query_cache_waiters);
2534 
2535  if (Global::block_cache)
2536  Global::block_cache->get_stats(&m_stats->block_cache_max_memory,
2537  &m_stats->block_cache_available_memory,
2538  &m_stats->block_cache_accesses,
2539  &m_stats->block_cache_hits);
2540 
2541  TableMutatorPtr mutator;
2542  if (now > m_next_metrics_update) {
2543  if (!Global::rs_metrics_table) {
2544  lock_guard<mutex> lock(Global::mutex);
2545  try {
2546  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
2547  if (!Global::range_locator)
2548  Global::range_locator = make_shared<Hypertable::RangeLocator>(m_props, m_conn_manager,
2549  Global::hyperspace, timeout_ms);
2551  Global::rs_metrics_table = make_shared<Table>(m_props, Global::range_locator, m_conn_manager,
2552  Global::hyperspace, aq,
2553  m_namemap, "sys/RS_METRICS", 0, timeout_ms);
2554  }
2555  catch (Hypertable::Exception &e) {
2556  HT_ERRORF("Unable to open 'sys/RS_METRICS' - %s (%s)",
2557  Error::get_text(e.code()), e.what());
2558  }
2559  }
2561  CellsBuilder *pending_metrics_updates = 0;
2562  mutator.reset(Global::rs_metrics_table->create_mutator());
2563 
2564  {
2565  lock_guard<mutex> lock(m_pending_metrics_mutex);
2566  pending_metrics_updates = m_pending_metrics_updates;
2567  m_pending_metrics_updates = 0;
2568  }
2569 
2570  if (pending_metrics_updates) {
2571  KeySpec key;
2572  Cells &cells = pending_metrics_updates->get();
2573  for (size_t i=0; i<cells.size(); i++) {
2574  key.row = cells[i].row_key;
2575  key.row_len = strlen(cells[i].row_key);
2576  key.column_family = cells[i].column_family;
2577  key.column_qualifier = cells[i].column_qualifier;
2578  key.column_qualifier_len = strlen(cells[i].column_qualifier);
2579  mutator->set(key, cells[i].value, cells[i].value_len);
2580  }
2581  delete pending_metrics_updates;
2582  }
2583  }
2584  }
2585 
2589  CstrToInt32Map table_scanner_count_map;
2590  StatsTable table_stat;
2591  StatsTableMap::iterator iter;
2592 
2593  m_stats->tables.clear();
2594 
2595  if (mutator || !ranges) {
2596  ranges = make_shared<Ranges>();
2597  m_context->live_map->get_ranges(*ranges);
2598  }
2599  for (auto &rd : ranges->array) {
2600 
2601  if (rd.data == 0)
2602  rd.data = rd.range->get_maintenance_data(ranges->arena, now, 0, mutator.get());
2603 
2604  if (rd.data->table_id == 0) {
2605  HT_ERROR_OUT << "Range statistics object found without table ID" << HT_END;
2606  continue;
2607  }
2608 
2609  if (table_stat.table_id == "")
2610  table_stat.table_id = rd.data->table_id;
2611  else if (strcmp(table_stat.table_id.c_str(), rd.data->table_id)) {
2612  if (table_stat.disk_used > 0)
2613  table_stat.compression_ratio = (double)table_stat.disk_used / table_stat.compression_ratio;
2614  else
2615  table_stat.compression_ratio = 1.0;
2616  m_stats->tables.push_back(table_stat);
2617  table_scanner_count_map[table_stat.table_id.c_str()] = 0;
2618  table_stat.clear();
2619  table_stat.table_id = rd.data->table_id;
2620  }
2621 
2622  table_stat.scans += rd.data->load_factors.scans;
2623  m_load_factors.scans += rd.data->load_factors.scans;
2624  table_stat.updates += rd.data->load_factors.updates;
2625  m_load_factors.updates += rd.data->load_factors.updates;
2626  table_stat.cells_scanned += rd.data->load_factors.cells_scanned;
2627  m_load_factors.cells_scanned += rd.data->load_factors.cells_scanned;
2628  table_stat.cells_returned += rd.data->cells_returned;
2629  table_stat.cells_written += rd.data->load_factors.cells_written;
2630  m_load_factors.cells_written += rd.data->load_factors.cells_written;
2631  table_stat.bytes_scanned += rd.data->load_factors.bytes_scanned;
2632  table_stat.bytes_returned += rd.data->bytes_returned;
2633  table_stat.bytes_written += rd.data->load_factors.bytes_written;
2635  m_load_factors.disk_bytes_read += rd.data->load_factors.disk_bytes_read;
2637  table_stat.disk_used += rd.data->disk_used;
2638  table_stat.key_bytes += rd.data->key_bytes;
2639  table_stat.value_bytes += rd.data->value_bytes;
2640  table_stat.compression_ratio += (double)rd.data->disk_used / rd.data->compression_ratio;
2641  table_stat.memory_used += rd.data->memory_used;
2642  table_stat.memory_allocated += rd.data->memory_allocated;
2643  table_stat.shadow_cache_memory += rd.data->shadow_cache_memory;
2644  table_stat.block_index_memory += rd.data->block_index_memory;
2645  table_stat.bloom_filter_memory += rd.data->bloom_filter_memory;
2647  table_stat.bloom_filter_maybes += rd.data->bloom_filter_maybes;
2648  table_stat.cell_count += rd.data->cell_count;
2649  table_stat.file_count += rd.data->file_count;
2650  table_stat.range_count++;
2651  }
2652 
2653  m_stats->range_count = ranges->array.size();
2654  if (table_stat.table_id != "") {
2655  if (table_stat.disk_used > 0)
2656  table_stat.compression_ratio = (double)table_stat.disk_used / table_stat.compression_ratio;
2657  else
2658  table_stat.compression_ratio = 1.0;
2659  m_stats->tables.push_back(table_stat);
2660  table_scanner_count_map[table_stat.table_id.c_str()] = 0;
2661  }
2662 
2663  // collect outstanding scanner count and compute server cellstore total
2664  m_stats->file_count = 0;
2665  m_scanner_map.get_counts(&m_stats->scanner_count, table_scanner_count_map);
2666  for (size_t i=0; i<m_stats->tables.size(); i++) {
2667  m_stats->tables[i].scanner_count = table_scanner_count_map[m_stats->tables[i].table_id.c_str()];
2668  m_stats->file_count += m_stats->tables[i].file_count;
2669  }
2670 
2674  if (mutator) {
2675  time_t rounded_time = (now+(Global::metrics_interval/2)) - ((now+(Global::metrics_interval/2))%Global::metrics_interval);
2676  if (m_last_metrics_update != 0) {
2677  double time_interval = (double)now - (double)m_last_metrics_update;
2678  String value = format("3:%ld,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f,%.6f:%lld:%lld",
2679  rounded_time,
2680  m_loadavg_accum / (double)(m_metric_samples * m_cores),
2681  (double)m_load_factors.disk_bytes_read / time_interval,
2682  (double)m_load_factors.bytes_written / time_interval,
2683  (double)m_load_factors.bytes_scanned / time_interval,
2684  (double)m_load_factors.updates / time_interval,
2685  (double)m_load_factors.scans / time_interval,
2686  (double)m_load_factors.cells_written / time_interval,
2687  (double)m_load_factors.cells_scanned / time_interval,
2688  (double)m_page_in_accum / (double)m_metric_samples,
2689  (double)m_page_out_accum / (double)m_metric_samples,
2690  (Lld)disk_total, (Lld)disk_avail);
2691  String location = Global::location_initializer->get();
2692  KeySpec key;
2693  key.row = location.c_str();
2694  key.row_len = location.length();
2695  key.column_family = "server";
2696  key.column_qualifier = 0;
2697  key.column_qualifier_len = 0;
2698  try {
2699  mutator->set(key, (uint8_t *)value.c_str(), value.length());
2700  mutator->flush();
2701  }
2702  catch (Exception &e) {
2703  HT_ERROR_OUT << "Problem updating sys/RS_METRICS - " << e << HT_END;
2704  }
2705  }
2706  m_next_metrics_update += Global::metrics_interval;
2707  m_last_metrics_update = now;
2708  m_loadavg_accum = 0.0;
2709  m_page_in_accum = 0;
2710  m_page_out_accum = 0;
2711  m_load_factors.reset();
2712  m_metric_samples = 0;
2713  }
2714 
2715  cb->response(*m_stats.get());
2716 
2717  // Ganglia metrics
2718 
2719  m_metrics_process.collect(timestamp, m_ganglia_collector.get());
2720 
2721  m_ganglia_collector->update("scans",
2722  (float)load_stats.scan_count / period_seconds);
2723  m_ganglia_collector->update("updates",
2724  (float)load_stats.update_count / period_seconds);
2725  m_ganglia_collector->update("cellsScanned",
2726  (float)load_stats.cells_scanned / period_seconds);
2727  if (load_stats.cells_scanned > 0)
2728  m_ganglia_collector->update("cellsScanYield",
2729  ((float)load_stats.cells_returned /
2730  (float)load_stats.cells_scanned) * 100.0);
2731  m_ganglia_collector->update("cellsReturned",
2732  (float)(load_stats.cells_returned +
2733  load_stats.cached_cells_returned)
2734  / period_seconds);
2735  m_ganglia_collector->update("cellsWritten",
2736  (float)load_stats.update_cells / period_seconds);
2737  m_ganglia_collector->update("bytesScanned",
2738  (float)load_stats.bytes_scanned / period_seconds);
2739  if (load_stats.bytes_scanned > 0)
2740  m_ganglia_collector->update("bytesScanYield",
2741  ((float)load_stats.bytes_returned /
2742  (float)load_stats.bytes_scanned) * 100.0);
2743  m_ganglia_collector->update("bytesReturned",
2744  (float)(load_stats.bytes_returned +
2745  load_stats.cached_bytes_returned)
2746  / period_seconds);
2747  m_ganglia_collector->update("bytesWritten",
2748  (float)load_stats.update_bytes / period_seconds);
2749 
2750  m_ganglia_collector->update("compactions.major", load_stats.compactions_major);
2751  m_ganglia_collector->update("compactions.minor", load_stats.compactions_minor);
2752  m_ganglia_collector->update("compactions.merging", load_stats.compactions_merging);
2753  m_ganglia_collector->update("compactions.gc", load_stats.compactions_gc);
2754 
2755  m_ganglia_collector->update("scanners",
2756  m_stats->scanner_count);
2757  m_ganglia_collector->update("cellstores",
2758  (int32_t)m_stats->file_count);
2759  m_ganglia_collector->update("ranges",
2760  m_stats->range_count);
2761  m_ganglia_collector->update("memory.tracked",
2762  (float)m_stats->tracked_memory / 1000000000.0);
2763 
2764  HT_ASSERT(previous_block_cache_accesses <= m_stats->block_cache_accesses &&
2765  previous_block_cache_hits <= m_stats->block_cache_hits);
2766  uint64_t block_cache_accesses = m_stats->block_cache_accesses - previous_block_cache_accesses;
2767  uint64_t block_cache_hits = m_stats->block_cache_hits - previous_block_cache_hits;
2768 
2769  if (block_cache_accesses)
2770  m_ganglia_collector->update("blockCache.hitRate",
2771  (int32_t)((block_cache_hits*100)
2772  / block_cache_accesses));
2773  else
2774  m_ganglia_collector->update("blockCache.hitRate", (int32_t)0);
2775  m_ganglia_collector->update("blockCache.memory",
2776  (float)m_stats->block_cache_max_memory / 1000000000.0);
2777  uint64_t block_cache_fill = m_stats->block_cache_max_memory -
2778  m_stats->block_cache_available_memory;
2779  m_ganglia_collector->update("blockCache.fill",
2780  (float)block_cache_fill / 1000000000.0);
2781 
2782  HT_ASSERT(previous_query_cache_accesses <= m_stats->query_cache_accesses &&
2783  previous_query_cache_hits <= m_stats->query_cache_hits);
2784  uint64_t query_cache_accesses = m_stats->query_cache_accesses - previous_query_cache_accesses;
2785  uint64_t query_cache_hits = m_stats->query_cache_hits - previous_query_cache_hits;
2786 
2787  if (query_cache_accesses)
2788  m_ganglia_collector->update("queryCache.hitRate",
2789  (int32_t)((query_cache_hits*100) /
2790  query_cache_accesses));
2791  else
2792  m_ganglia_collector->update("queryCache.hitRate", (int32_t)0);
2793  m_ganglia_collector->update("queryCache.memory",
2794  (float)m_stats->query_cache_max_memory / 1000000000.0);
2795  uint64_t query_cache_fill = m_stats->query_cache_max_memory -
2796  m_stats->query_cache_available_memory;
2797  m_ganglia_collector->update("queryCache.fill",
2798  (float)query_cache_fill / 1000000000.0);
2799  m_ganglia_collector->update("queryCache.waiters", query_cache_waiters);
2800 
2801  m_ganglia_collector->update("requestBacklog",(int32_t)m_app_queue->backlog());
2802 
2803  try {
2804  m_ganglia_collector->publish();
2805  }
2806  catch (Exception &e) {
2807  HT_INFOF("Problem publishing Ganglia metrics - %s", e.what());
2808  }
2809 
2810  m_stats_last_timestamp = timestamp;
2811 
2812  HT_INFO("Exiting get_statistics()");
2813 
2814  return;
2815 }
2816 
2817 
2818 void
2820  const RangeSpec &range_spec) {
2821  TableInfoPtr table_info;
2822  RangePtr range;
2823  std::stringstream sout;
2824 
2825  sout << "drop_range\n"<< table << range_spec;
2826  HT_INFOF("%s", sout.str().c_str());
2827 
2828  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table, range_spec))
2829  return;
2830 
2831  try {
2832 
2833  if (!m_context->live_map->lookup(table.id, table_info))
2834  HT_THROWF(Error::TABLE_NOT_FOUND, "%s", table.id);
2835 
2837  if (!table_info->remove_range(range_spec, range))
2839  format("%s[%s..%s]", table.id, range_spec.start_row, range_spec.end_row));
2840 
2841  cb->response_ok();
2842  }
2843  catch (Hypertable::Exception &e) {
2844  HT_ERROR_OUT << e << HT_END;
2845  int error = 0;
2846  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK)
2847  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
2848  }
2849 }
2850 
2851 void
2853  const TableIdentifier &table, const RangeSpec &range_spec) {
2854  TableInfoPtr table_info;
2855  RangePtr range;
2856  std::stringstream sout;
2857 
2858  sout << "relinquish_range\n" << table << range_spec;
2859  HT_INFOF("%s", sout.str().c_str());
2860 
2861  if (!m_log_replay_barrier->wait(cb->event()->deadline(), table, range_spec))
2862  return;
2863 
2864  try {
2865  if (!m_context->live_map->lookup(table.id, table_info)) {
2866  cb->error(Error::TABLE_NOT_FOUND, table.id);
2867  return;
2868  }
2869 
2870  if (!table_info->get_range(range_spec, range))
2872  format("%s[%s..%s]", table.id, range_spec.start_row,
2873  range_spec.end_row));
2874 
2875  range->schedule_relinquish();
2876 
2877  // Wake up maintenance scheduler
2878  m_timer_handler->schedule_immediate_maintenance();
2879 
2880  cb->response_ok();
2881  }
2882  catch (Hypertable::Exception &e) {
2883  int error = 0;
2884  HT_INFOF("%s - %s", Error::get_text(e.code()), e.what());
2885  if (cb && (error = cb->error(e.code(), e.what())) != Error::OK)
2886  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
2887  }
2888 }
2889 
2891  const String &location, int32_t plan_generation,
2892  int32_t type, const vector<int32_t> &fragments,
2893  const RangeServerRecovery::ReceiverPlan &receiver_plan,
2894  int32_t replay_timeout) {
2895  Timer timer(Global::failover_timeout/2, true);
2896 
2897  HT_INFOF("replay_fragments location=%s, plan_generation=%d, num_fragments=%d",
2898  location.c_str(), plan_generation, (int)fragments.size());
2899 
2900  CommitLogReaderPtr log_reader;
2901  String log_dir = Global::toplevel_dir + "/servers/" + location + "/log/" +
2902  RangeSpec::type_str(type);
2903 
2904  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
2905  return;
2906 
2907  HT_INFOF("replay_fragments(id=%lld, %s, plan_generation=%d, num_fragments=%d)",
2908  (Lld)op_id, location.c_str(), plan_generation, (int)fragments.size());
2909 
2910  cb->response_ok();
2911 
2912  try {
2913  log_reader = make_shared<CommitLogReader>(Global::log_dfs, log_dir, fragments);
2914  StringSet receivers;
2915  receiver_plan.get_locations(receivers);
2916  CommAddress addr;
2917  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
2918  Timer timer(replay_timeout, true);
2919  for (const auto &receiver : receivers) {
2920  addr.set_proxy(receiver);
2921  m_conn_manager->add(addr, timeout_ms, "RangeServer");
2922  if (!m_conn_manager->wait_for_connection(addr, timer.remaining())) {
2923  if (timer.expired())
2924  HT_THROWF(Error::REQUEST_TIMEOUT, "Problem connecting to %s", receiver.c_str());
2925  }
2926  }
2927 
2928  BlockHeaderCommitLog header;
2929  uint8_t *base;
2930  size_t len;
2931  TableIdentifier table_id;
2932  const uint8_t *ptr, *end;
2933  SerializedKey key;
2934  ByteString value;
2935  uint32_t block_count = 0;
2936  uint32_t fragment_id;
2937  uint32_t last_fragment_id = 0;
2938  bool started = false;
2939  ReplayBuffer replay_buffer(m_props, m_context->comm, receiver_plan, location, plan_generation);
2940  size_t num_kv_pairs=0;
2941 
2942  try {
2943 
2944  while (log_reader->next((const uint8_t **)&base, &len, &header)) {
2945  fragment_id = log_reader->last_fragment_id();
2946  if (!started) {
2947  started = true;
2948  last_fragment_id = fragment_id;
2949  replay_buffer.set_current_fragment(fragment_id);
2950  }
2951  else if (fragment_id != last_fragment_id) {
2952  replay_buffer.flush();
2953  last_fragment_id = fragment_id;
2954  replay_buffer.set_current_fragment(fragment_id);
2955  }
2956 
2957  ptr = base;
2958  end = base + len;
2959 
2960  decode_table_id(&ptr, &len, &table_id);
2961 
2962  num_kv_pairs = 0;
2963  while (ptr < end) {
2964  // extract the key
2965  key.ptr = ptr;
2966  ptr += key.length();
2967  if (ptr > end)
2968  HT_THROW(Error::RANGESERVER_CORRUPT_COMMIT_LOG, "Problem decoding key");
2969  // extract the value
2970  value.ptr = ptr;
2971  ptr += value.length();
2972  if (ptr > end)
2973  HT_THROW(Error::RANGESERVER_CORRUPT_COMMIT_LOG, "Problem decoding value");
2974  ++num_kv_pairs;
2975  replay_buffer.add(table_id, key, value);
2976  }
2977  HT_INFOF("Replayed %d key/value pairs from fragment %s",
2978  (int)num_kv_pairs, log_reader->last_fragment_fname().c_str());
2979  block_count++;
2980 
2981  // report back status
2982  if (timer.expired()) {
2983  try {
2984  m_master_client->replay_status(op_id, location, plan_generation);
2985  }
2986  catch (Exception &ee) {
2987  HT_ERROR_OUT << ee << HT_END;
2988  }
2989  timer.reset(true);
2990  }
2991  }
2992 
2993  HT_MAYBE_FAIL_X("replay-fragments-user-0", type==RangeSpec::USER);
2994 
2995  }
2996  catch (Exception &e){
2997  HT_ERROR_OUT << log_reader->last_fragment_fname() << ": " << e << HT_END;
2998  HT_THROWF(e.code(), "%s: %s", log_reader->last_fragment_fname().c_str(), e.what());
2999  }
3000 
3001  replay_buffer.flush();
3002 
3003  HT_MAYBE_FAIL_X("replay-fragments-user-1", type==RangeSpec::USER);
3004 
3005  HT_INFOF("Finished playing %d fragments from %s",
3006  (int)fragments.size(), log_dir.c_str());
3007 
3008  }
3009  catch (Exception &e) {
3010  HT_ERROR_OUT << e << HT_END;
3011  try {
3012  m_master_client->replay_complete(op_id, location, plan_generation,
3013  e.code(), e.what());
3014  }
3015  catch (Exception &ee) {
3016  HT_ERROR_OUT << ee << HT_END;
3017  }
3018  return;
3019  }
3020 
3021  try {
3022  m_master_client->replay_complete(op_id, location, plan_generation,
3023  Error::OK, "");
3024  }
3025  catch (Exception &e){
3026  HT_ERROR_OUT << "Unable to call player_complete on master for op_id="
3027  << op_id << ", type=" << type << ", location=" << location
3028  << ", plan_generation=" << plan_generation << ", num_fragments="
3029  << fragments.size() << " - " << e << HT_END;
3030  }
3031 }
3032 
3034  int32_t plan_generation,
3035  const vector<int32_t> &fragments,
3036  const vector<QualifiedRangeSpec> &specs,
3037  const vector<RangeState> &states) {
3038  TableInfoPtr table_info;
3039  FailoverPhantomRangeMap::iterator failover_map_it;
3040  PhantomRangeMapPtr phantom_range_map;
3041  TableInfoMapPtr phantom_tableinfo_map;
3042  int error;
3043 
3044  HT_INFOF("phantom_load location=%s, plan_generation=%d, num_fragments=%d,"
3045  " num_ranges=%d", location.c_str(), plan_generation,
3046  (int)fragments.size(), (int)specs.size());
3047 
3048  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
3049  return;
3050 
3051  HT_ASSERT(!specs.empty());
3052 
3053  HT_MAYBE_FAIL_X("phantom-load-user", specs[0].table.is_user());
3054 
3055  {
3056  lock_guard<mutex> lock(m_failover_mutex);
3057  failover_map_it = m_failover_map.find(location);
3058  if (failover_map_it == m_failover_map.end()) {
3059  phantom_range_map = make_shared<PhantomRangeMap>(plan_generation);
3060  m_failover_map[location] = phantom_range_map;
3061  }
3062  else
3063  phantom_range_map = failover_map_it->second;
3064  }
3065 
3066  {
3067  lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3068 
3069  // check for out-of-order phantom_load requests
3070  if (plan_generation < phantom_range_map->get_plan_generation())
3071  return;
3072 
3073  if (plan_generation > phantom_range_map->get_plan_generation())
3074  phantom_range_map->reset(plan_generation);
3075  else if (phantom_range_map->loaded()) {
3076  cb->response_ok();
3077  return;
3078  }
3079 
3080  phantom_tableinfo_map = phantom_range_map->get_tableinfo_map();
3081 
3082  try {
3083  for (size_t i=0; i<specs.size(); i++) {
3084  const QualifiedRangeSpec &spec = specs[i];
3085  const RangeState &state = states[i];
3086 
3087  // XXX: TODO: deal with dropped tables
3088 
3089  phantom_tableinfo_map->get(spec.table.id, table_info);
3090 
3091  uint32_t generation = table_info->get_schema()->get_generation();
3092  if (generation > spec.table.generation) {
3093  HT_WARNF("Table generation mismatch in phantom load request (%d < %d),"
3094  " automatically upgrading", (int)spec.table.generation, (int)generation);
3095  ((QualifiedRangeSpec *)&spec)->table.generation = generation;
3096  }
3097 
3098  if (!live(spec))
3099  phantom_range_map->insert(spec, state, table_info->get_schema(), fragments);
3100  }
3101  }
3102  catch (Exception &e) {
3103  HT_ERROR_OUT << "Phantom load failed - " << e << HT_END;
3104  if ((error = cb->error(e.code(), e.what())))
3105  HT_ERRORF("Problem sending error response - %s", Error::get_text(error));
3106  return;
3107  }
3108 
3109  phantom_range_map->set_loaded();
3110  }
3111 
3112  cb->response_ok();
3113 }
3114 
3116  const String &location,
3117  int32_t plan_generation,
3118  const QualifiedRangeSpec &range,
3119  int32_t fragment, EventPtr &event) {
3120  std::stringstream sout;
3121 
3122  sout << "phantom_update location=" << location << ", fragment="
3123  << fragment << ", range=" << range;
3124  HT_INFOF("%s", sout.str().c_str());
3125 
3126  FailoverPhantomRangeMap::iterator failover_map_it;
3127  PhantomRangeMapPtr phantom_range_map;
3128  PhantomRangePtr phantom_range;
3129 
3130  HT_MAYBE_FAIL_X("phantom-update-user", range.table.is_user());
3131  HT_MAYBE_FAIL_X("phantom-update-metadata", range.table.is_metadata());
3132 
3133  {
3134  lock_guard<mutex> lock(m_failover_mutex);
3135  failover_map_it = m_failover_map.find(location);
3136  if (failover_map_it == m_failover_map.end()) {
3138  "no phantom range map found for recovery of " + location);
3139  }
3140  phantom_range_map = failover_map_it->second;
3141  }
3142 
3143  {
3144  lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3145 
3146  // verify plan generation
3147  if (plan_generation != phantom_range_map->get_plan_generation())
3149  "supplied = %d, installed == %d", plan_generation,
3150  phantom_range_map->get_plan_generation());
3151 
3152  if (phantom_range_map->replayed()) {
3153  cb->response_ok();
3154  return;
3155  }
3156 
3157  HT_ASSERT(phantom_range_map->loaded());
3158 
3159  phantom_range_map->get(range, phantom_range);
3160  if (phantom_range && !phantom_range->replayed() && !phantom_range->add(fragment, event)) {
3161  String msg = format("fragment %d completely received for range "
3162  "%s[%s..%s]", fragment, range.table.id, range.range.start_row,
3163  range.range.end_row);
3164  HT_INFOF("%s", msg.c_str());
3166  return;
3167  }
3168 
3169  }
3170 
3171  cb->response_ok();
3172 }
3173 
3175  const String &location, int32_t plan_generation,
3176  const vector<QualifiedRangeSpec> &specs) {
3177  FailoverPhantomRangeMap::iterator failover_map_it;
3178  TableInfoMapPtr phantom_map;
3179  PhantomRangeMapPtr phantom_range_map;
3180  PhantomRangePtr phantom_range;
3181  TableInfoPtr phantom_table_info;
3182  vector<MetaLog::EntityPtr> metalog_entities;
3183 
3184  HT_INFOF("phantom_prepare_ranges op_id=%lld, location=%s, plan_generation=%d,"
3185  " num_ranges=%d", (Lld)op_id, location.c_str(), plan_generation,
3186  (int)specs.size());
3187 
3188  if (!m_log_replay_barrier->wait_for_user(cb->event()->deadline()))
3189  return;
3190 
3191  cb->response_ok();
3192 
3193  {
3194  lock_guard<mutex> lock(m_failover_mutex);
3195  failover_map_it = m_failover_map.find(location);
3196  if (failover_map_it == m_failover_map.end()) {
3197  try {
3198  String msg = format("No phantom map found for %s", location.c_str());
3199  HT_INFOF("%s", msg.c_str());
3200  m_master_client->phantom_prepare_complete(op_id, location, plan_generation,
3202  }
3203  catch (Exception &e) {
3204  HT_ERROR_OUT << e << HT_END;
3205  }
3206  return;
3207  }
3208  phantom_range_map = failover_map_it->second;
3209  }
3210 
3211  try {
3212  lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3213 
3214  if (phantom_range_map->prepared()) {
3215  try {
3216  m_master_client->phantom_prepare_complete(op_id, location, plan_generation, Error::OK, "");
3217  }
3218  catch (Exception &e) {
3219  HT_ERROR_OUT << e << HT_END;
3220  }
3221  return;
3222  }
3223 
3224  HT_ASSERT(phantom_range_map->loaded());
3225 
3226  phantom_map = phantom_range_map->get_tableinfo_map();
3227 
3228  for (const auto &rr : specs) {
3229  phantom_table_info = 0;
3230  HT_ASSERT(phantom_map->lookup(rr.table.id, phantom_table_info));
3231  TableInfoPtr table_info;
3232  m_context->live_map->get(rr.table.id, table_info);
3233 
3234  // If maintenance has been disabled for the table, tell the maintenance
3235  // scheduler to not schedule maintenance for it
3236  if (table_info->maintenance_disabled())
3237  m_maintenance_scheduler->exclude(rr.table);
3238 
3239  if (rr.table.generation != table_info->get_schema()->get_generation())
3240  HT_WARNF("Table (id=%s) generation mismatch %lld != %lld", rr.table.id,
3241  (Lld)rr.table.generation,
3242  (Lld)table_info->get_schema()->get_generation());
3243 
3244  //HT_DEBUG_OUT << "Creating Range object for range " << rr << HT_END;
3245  // create a real range and its transfer log
3246  phantom_range_map->get(rr, phantom_range);
3247 
3248  // If already staged, continue with next range
3249  if (!phantom_range || phantom_range->prepared())
3250  continue;
3251 
3252  if (!Global::metadata_table) {
3253  lock_guard<mutex> lock(Global::mutex);
3254  // TODO double-check locking (works fine on x86 and amd64 but may fail
3255  // on other archs without using a memory barrier
3256  if (!Global::metadata_table) {
3257  uint32_t timeout_ms = m_props->get_i32("Hypertable.Request.Timeout");
3258  if (!Global::range_locator)
3259  Global::range_locator = make_shared<Hypertable::RangeLocator>(m_props,
3260  m_conn_manager, Global::hyperspace, timeout_ms);
3262  Global::metadata_table = make_shared<Table>(m_props, Global::range_locator,
3263  m_conn_manager, Global::hyperspace, aq,
3264  m_namemap, TableIdentifier::METADATA_NAME, 0, timeout_ms);
3265  }
3266  }
3267 
3268  // if we're about to move a root range: make sure that the location
3269  // of the metadata-table is updated
3270  if (rr.table.is_metadata()) {
3271  Global::metadata_table->get_range_locator()->invalidate(&rr.table,
3273  Global::metadata_table->get_range_locator()->set_root_stale();
3274  }
3275 
3276  phantom_range->create_range(m_master_client, table_info,
3277  Global::log_dfs);
3278  HT_DEBUG_OUT << "Range object created for range " << rr << HT_END;
3279  }
3280 
3281  CommitLogPtr log;
3282  for (const QualifiedRangeSpec &rr : specs) {
3283  bool is_empty = true;
3284 
3285  phantom_range_map->get(rr, phantom_range);
3286 
3287  // If already prepared, continue with next range
3288  if (!phantom_range || phantom_range->prepared())
3289  continue;
3290 
3291  phantom_range->populate_range_and_log(Global::log_dfs, op_id, &is_empty);
3292 
3293  HT_DEBUG_OUT << "populated range and log for range " << rr << HT_END;
3294 
3295  RangePtr range = phantom_range->get_range();
3296  if (!rr.table.is_user()) {
3297  lock_guard<mutex> lock(Global::mutex);
3298  if (rr.table.is_metadata()) {
3299  if (rr.is_root()) {
3300  if (!Global::root_log) {
3301  Global::log_dfs->mkdirs(Global::log_dir + "/root");
3302  if (!Global::root_log)
3303  Global::root_log = make_shared<CommitLog>(Global::log_dfs,
3304  Global::log_dir + "/root", m_props);
3305  }
3306  }
3307  if (!Global::metadata_log) {
3308  Global::log_dfs->mkdirs(Global::log_dir + "/metadata");
3309  Global::metadata_log = make_shared<CommitLog>(Global::log_dfs,
3310  Global::log_dir + "/metadata", m_props);
3311  m_update_pipeline_metadata =
3312  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
3313  Global::metadata_log, m_log_flush_method_meta);
3314  }
3315  log = rr.is_root() ? Global::root_log : Global::metadata_log;
3316  }
3317  else if (rr.table.is_system()) {
3318  if (!Global::system_log) {
3319  Global::log_dfs->mkdirs(Global::log_dir + "/system");
3320  Global::system_log = make_shared<CommitLog>(Global::log_dfs,
3321  Global::log_dir + "/system", m_props);
3322  m_update_pipeline_system =
3323  make_shared<UpdatePipeline>(m_context, m_query_cache, m_timer_handler,
3324  Global::system_log, m_log_flush_method_user);
3325  }
3326  log = Global::system_log;
3327  }
3328  }
3329  else
3330  log = Global::user_log;
3331 
3332  CommitLogReaderPtr phantom_log = phantom_range->get_phantom_log();
3333  HT_ASSERT(phantom_log && log);
3334  int error = Error::OK;
3335  if (!is_empty
3336  && (error = log->link_log(ClusterId::get(), phantom_log.get())) != Error::OK) {
3337 
3338  String msg = format("Problem linking phantom log '%s' for range %s[%s..%s]",
3339  phantom_range->get_phantom_logname().c_str(),
3340  rr.table.id, rr.range.start_row, rr.range.end_row);
3341 
3342  m_master_client->phantom_prepare_complete(op_id, location, plan_generation, error, msg);
3343  return;
3344  }
3345 
3346  metalog_entities.push_back( range->metalog_entity() );
3347 
3348  HT_ASSERT(phantom_map->lookup(rr.table.id, phantom_table_info));
3349 
3350  HT_INFO("phantom adding range");
3351 
3352  phantom_table_info->add_range(range, true);
3353 
3354  phantom_range->set_prepared();
3355  }
3356 
3357  HT_MAYBE_FAIL_X("phantom-prepare-ranges-user-1", specs.back().table.is_user());
3358 
3359  HT_DEBUG_OUT << "write all range entries to rsml" << HT_END;
3360  // write metalog entities
3361  if (Global::rsml_writer)
3362  Global::rsml_writer->record_state(metalog_entities);
3363  else
3366 
3367  HT_MAYBE_FAIL_X("phantom-prepare-ranges-root-2", specs.back().is_root());
3368  HT_MAYBE_FAIL_X("phantom-prepare-ranges-user-2", specs.back().table.is_user());
3369 
3370  phantom_range_map->set_prepared();
3371 
3372  }
3373  catch (Exception &e) {
3374  HT_ERROR_OUT << e << HT_END;
3375  try {
3376  m_master_client->phantom_prepare_complete(op_id, location, plan_generation, e.code(), e.what());
3377  }
3378  catch (Exception &e) {
3379  HT_ERROR_OUT << e << HT_END;
3380  }
3381  return;
3382  }
3383 
3384  try {
3385  m_master_client->phantom_prepare_complete(op_id, location, plan_generation, Error::OK, "");
3386  }
3387  catch (Exception &e) {
3388  HT_ERROR_OUT << e << HT_END;
3389  }
3390 
3391  HT_MAYBE_FAIL("phantom-prepare-ranges-user-3");
3392 
3393 }
3394 
3396  const String &location, int32_t plan_generation,
3397  const vector<QualifiedRangeSpec> &specs) {
3398  FailoverPhantomRangeMap::iterator failover_map_it;
3399  PhantomRangeMapPtr phantom_range_map;
3400  TableInfoMapPtr phantom_map;
3401  TableMutatorPtr mutator;
3402  KeySpec key;
3403  String our_location = Global::location_initializer->get();
3404  vector<MetaLog::EntityPtr> entities;
3405  StringSet phantom_logs;
3406  map<QualifiedRangeSpec, TableInfoPtr> phantom_table_info_map;
3407  map<QualifiedRangeSpec, int> error_map;
3408  vector<RangePtr> range_vec;
3409 
3410  HT_INFOF("phantom_commit_ranges op_id=%lld, location=%s, plan_generation=%d,"
3411  " num_ranges=%d", (Lld)op_id, location.c_str(), plan_generation,
3412  (int)specs.size());
3413 
3414  if (!m_log_replay_barrier->wait_for_system(cb->event()->deadline()))
3415  return;
3416 
3417  cb->response_ok();
3418 
3419  if (live(specs)) {
3420  // Remove phantom map
3421  {
3422  lock_guard<mutex> lock(m_failover_mutex);
3423  m_failover_map.erase(location);
3424  }
3425  // Report success
3426  try {
3427  m_master_client->phantom_commit_complete(op_id, location, plan_generation, Error::OK, "");
3428  }
3429  catch (Exception &e) {
3430  String msg = format("Error during phantom_commit op_id=%lld, "
3431  "plan_generation=%d, location=%s, num ranges=%u", (Lld)op_id,
3432  plan_generation, location.c_str(), (unsigned)specs.size());
3433  HT_ERRORF("%s - %s", Error::get_text(e.code()), msg.c_str());
3434  }
3435  return;
3436  }
3437 
3438  {
3439  lock_guard<mutex> lock(m_failover_mutex);
3440  failover_map_it = m_failover_map.find(location);
3441  if (failover_map_it == m_failover_map.end()) {
3442  try {
3443  String msg = format("No phantom map found for %s plan_generation=%d",
3444  location.c_str(), plan_generation);
3445  HT_INFOF("%s", msg.c_str());
3446  m_master_client->phantom_commit_complete(op_id, location, plan_generation,
3448  }
3449  catch (Exception &e) {
3450  HT_ERROR_OUT << e << HT_END;
3451  }
3452  return;
3453  }
3454  phantom_range_map = failover_map_it->second;
3455  }
3456 
3457  try {
3458  lock_guard<PhantomRangeMap> lock(*phantom_range_map);
3459 
3460  // Double-check to see if concurrent method call flipped them live
3461  if (live(specs))
3462  return;
3463 
3464  HT_ASSERT(phantom_range_map->prepared());
3465  HT_ASSERT(!phantom_range_map->committed());
3466 
3467  phantom_map = phantom_range_map->get_tableinfo_map();
3468 
3469  for (const auto &rr : specs) {
3470 
3471  RangePtr range;
3472  PhantomRangePtr phantom_range;
3473 
3474  String range_name = format("%s[%s..%s]", rr.table.id,
3475  rr.range.start_row, rr.range.end_row);
3476 
3477  // Fetch phantom_range object
3478  phantom_range_map->get(rr, phantom_range);
3479 
3480  if (!phantom_range || phantom_range->committed())
3481  continue;
3482 
3483  range = phantom_range->get_range();
3484 
3485  HT_ASSERT(range);
3486 
3487  bool is_root = range->is_root();
3488  MetaLogEntityRangePtr entity = range->metalog_entity();
3489  entity->set_needs_compaction(true);
3490  entity->set_load_acknowledged(false);
3491  entity->clear_state_bits(RangeState::PHANTOM);
3492  entities.push_back(entity);
3493  phantom_logs.insert( phantom_range->get_phantom_logname() );
3494 
3495  HT_MAYBE_FAIL_X("phantom-commit-user-1", rr.table.is_user());
3496 
3502  {
3503  String range_str = format("%s[%s..%s]", rr.table.id, rr.range.start_row, rr.range.end_row);
3504  HT_INFOF("Taking ownership of range %s", range_str.c_str());
3505  }
3506  if (!is_root) {
3507  String metadata_key_str = format("%s:%s", rr.table.id,rr.range.end_row);
3508 
3509  if (!mutator)
3510  mutator.reset(Global::metadata_table->create_mutator());
3511 
3512  // Take ownership of the range
3513  key.row = metadata_key_str.c_str();
3514  key.row_len = strlen(metadata_key_str.c_str());
3515  key.column_family = "Location";
3516  key.column_qualifier = 0;
3517  key.column_qualifier_len = 0;
3518 
3519  // just set for now we'll do one big flush right at the end
3520  HT_DEBUG_OUT << "Update metadata location for " << key << " to "
3521  << our_location << HT_END;
3522  mutator->set(key, our_location.c_str(), our_location.length());
3523  }
3524  else { //root
3525  uint64_t handle=0;
3526  uint32_t oflags = OPEN_FLAG_READ | OPEN_FLAG_WRITE | OPEN_FLAG_CREATE;
3527  HT_INFO("Failing over root METADATA range");
3528 
3530  Global::hyperspace, &handle);
3531  String root_filename = Global::toplevel_dir + "/root";
3532  handle = m_hyperspace->open(root_filename, oflags);
3533  Global::hyperspace->attr_set(handle, "Location", our_location.c_str(),
3534  our_location.length());
3535  HT_DEBUG_OUT << "Updated attr Location of " << root_filename << " to "
3536  << our_location << HT_END;
3537  }
3538 
3539  phantom_range->set_committed();
3540  }
3541 
3542  // flush mutator
3543  if (mutator)
3544  mutator->flush();
3545 
3546  HT_MAYBE_FAIL_X("phantom-commit-user-2", specs.back().table.is_user());
3547 
3548  /*
3549  * This method atomically does the following:
3550  * 1. Adds phantom_logs to RemoveOkLogs entity
3551  * 2. Persists entities and RemoveOkLogs entity to RSML
3552  * 3. Merges phantom_map into the live map
3553  */
3554  m_context->live_map->merge(phantom_map.get(), entities, phantom_logs);
3555 
3556  HT_MAYBE_FAIL_X("phantom-commit-user-3", specs.back().table.is_user());
3557 
3558  HT_INFOF("Merging phantom map into live map for recovery of %s (ID=%lld)",
3559  location.c_str(), (Lld)op_id);
3560 
3561  {
3562  lock_guard<mutex> lock(m_failover_mutex);
3563  m_failover_map.erase(location);
3564  }
3565 
3566  phantom_range_map->set_committed();
3567 
3568  }
3569  catch (Exception &e) {
3570  HT_ERROR_OUT << e << HT_END;
3571  try {
3572  m_master_client->phantom_commit_complete(op_id, location, plan_generation, e.code(), e.what());
3573  }
3574  catch (Exception &e) {
3575  HT_ERROR_OUT << e << HT_END;
3576  }
3577  return;
3578  }
3579 
3580  try {
3581 
3582  m_master_client->phantom_commit_complete(op_id, location, plan_generation, Error::OK, "");
3583 
3584  HT_MAYBE_FAIL_X("phantom-commit-user-4", specs.back().table.is_user());
3585 
3586  HT_DEBUG_OUT << "phantom_commit_complete sent to master for num_ranges="
3587  << specs.size() << HT_END;
3588 
3589  // Wake up maintenance scheduler to handle any "in progress" operations
3590  // that were happening on the ranges just added
3591  m_timer_handler->schedule_immediate_maintenance();
3592 
3593  }
3594  catch (Exception &e) {
3595  String msg = format("Error during phantom_commit op_id=%lld, "
3596  "plan_generation=%d, location=%s, num ranges=%u", (Lld)op_id,
3597  plan_generation, location.c_str(), (unsigned)specs.size());
3598  HT_ERRORF("%s - %s", Error::get_text(e.code()), msg.c_str());
3599  // do not re-throw because this would cause an error to get sent back
3600  // that is not expected
3601  }
3602 }
3603 
3604 bool Apps::RangeServer::live(const vector<QualifiedRangeSpec> &ranges) {
3605  TableInfoPtr table_info;
3606  size_t live_count = 0;
3607  for (const auto &qrs : ranges) {
3608  if (m_context->live_map->lookup(qrs.table.id, table_info)) {
3609  if (table_info->has_range(qrs.range))
3610  live_count++;
3611  }
3612  }
3613 
3614  return live_count == ranges.size();
3615 }
3616 
3618  TableInfoPtr table_info;
3619  if (m_context->live_map->lookup(spec.table.id, table_info)) {
3620  if (table_info->has_range(spec.range))
3621  return true;
3622  }
3623  return false;
3624 }
3625 
3626 
3628  HT_INFO("wait_for_maintenance");
3629  if (!Global::maintenance_queue->wait_for_empty(cb->event()->deadline()))
3630  cb->error(Error::REQUEST_TIMEOUT, "");
3631  cb->response_ok();
3632 }
3633 
3634 
3635 void Apps::RangeServer::verify_schema(TableInfoPtr &table_info, uint32_t generation,
3636  const TableSchemaMap *table_schemas) {
3637  DynamicBuffer valbuf;
3638  SchemaPtr schema = table_info->get_schema();
3639 
3640  if (!schema || schema->get_generation() < generation) {
3641  schema.reset();
3642  TableSchemaMap::const_iterator it;
3643  if (table_schemas &&
3644  (it = table_schemas->find(table_info->identifier().id))
3645  != table_schemas->end())
3646  schema = it->second;
3647 
3648  if (!schema) {
3649  String tablefile = Global::toplevel_dir + "/tables/"
3650  + table_info->identifier().id;
3651  m_hyperspace->attr_get(tablefile, "schema", valbuf);
3652  schema.reset( Schema::new_instance((const char *)valbuf.base) );
3653  }
3654 
3655  table_info->update_schema(schema);
3656 
3657  // Generation check ...
3658  if (schema->get_generation() < generation)
3660  "Fetched Schema generation for table '%s' is %lld"
3661  " but supplied is %lld", table_info->identifier().id,
3662  (Lld)schema->get_generation(), (Lld)generation);
3663  }
3664 }
3665 
3667  m_group_commit->trigger();
3668 }
3669 
3671 
3672  HT_ASSERT(m_timer_handler);
3673 
3674  try {
3675 
3676  // Purge expired scanners
3677  m_scanner_map.purge_expired(m_scanner_ttl);
3678 
3679  // Set Low Memory mode
3680  bool low_memory_mode = m_timer_handler->low_memory_mode();
3681  m_maintenance_scheduler->set_low_memory_mode(low_memory_mode);
3682  Global::low_activity_time.enable_window(!low_memory_mode);
3683 
3684  // Schedule maintenance
3685  m_maintenance_scheduler->schedule();
3686 
3687  // Check for control files
3688  auto now = chrono::steady_clock::now();
3689  if (now - m_last_control_file_check >= chrono::milliseconds(m_control_file_check_interval)) {
3690  if (FileUtils::exists(System::install_dir + "/run/query-profile")) {
3691  if (!m_profile_query) {
3692  lock_guard<mutex> lock(m_profile_mutex);
3693  String output_fname = System::install_dir + "/run/query-profile.output";
3694  m_profile_query_out.open(output_fname.c_str(), ios_base::out|ios_base::app);
3695  m_profile_query = true;
3696  }
3697  }
3698  else {
3699  if (m_profile_query) {
3700  lock_guard<mutex> lock(m_profile_mutex);
3701  m_profile_query_out.close();
3702  m_profile_query = false;
3703  }
3704  }
3705  m_last_control_file_check = now;
3706  }
3707 
3708  }
3709  catch (Hypertable::Exception &e) {
3710  HT_ERROR_OUT << e << HT_END;
3711  }
3712 
3713  // Notify timer handler so that it can resume
3714  m_timer_handler->maintenance_scheduled_notify();
3715 
3716  HT_INFOF("Memory Usage: %llu bytes", (Llu)Global::memory_tracker->balance());
3717 }
3718 
3719 void
3720 Apps::RangeServer::group_commit_add(EventPtr &event, uint64_t cluster_id,
3721  SchemaPtr &schema, const TableIdentifier &table,
3722  uint32_t count, StaticBuffer &buffer,
3723  uint32_t flags) {
3724  lock_guard<mutex> lock(m_mutex);
3725  if (!m_group_commit) {
3726  m_group_commit = std::make_shared<GroupCommit>(this);
3727  HT_ASSERT(!m_group_commit_timer_handler);
3728  m_group_commit_timer_handler = make_shared<GroupCommitTimerHandler>(m_context->comm, this, m_app_queue);
3729  m_group_commit_timer_handler->start();
3730  }
3731  m_group_commit->add(event, cluster_id, schema, table, count, buffer, flags);
3732 }
static bool enable_shadow_cache
Definition: Global.h:107
static LocationInitializerPtr location_initializer
Definition: Global.h:83
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
virtual void forward()=0
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
A memory buffer of static size.
Definition: StaticBuffer.h:45
POD-style structure to hold statistics.
bool m_verbose
Flag indicating if verbose logging is enabled.
Definition: RangeServer.h:283
Lock successfully granted.
Definition: LockSequencer.h:58
int response(int32_t id, int32_t skipped_rows, int32_t skipped_cells, bool more, ProfileDataScanner &profile_data, StaticBuffer &ext)
Declarations for MetaLog::Reader.
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
const char * row
Definition: Key.h:129
#define HT_WARNF(msg,...)
Definition: Logger.h:290
uint64_t bytes_scanned
Bytes scanned.
static int32_t merge_cellstore_run_length_threshold
Definition: Global.h:110
The FailureInducer simulates errors.
static const CpuInfo & cpu_info()
Retrieves updated CPU information (see SystemInfo.h)
Definition: SystemInfo.cc:322
std::shared_ptr< PhantomRangeMap > PhantomRangeMapPtr
Range specification.
Definition: RangeSpec.h:40
Holds Nagios-style program status information.
Definition: Status.h:42
uint64_t block_index_memory
Definition: StatsTable.h:91
static int32_t access_group_garbage_compaction_threshold
Definition: Global.h:87
static TablePtr metadata_table
Definition: Global.h:91
Type declarations for PseudoTables class.
static string compact_flags_to_string(uint32_t flags)
Definition: Protocol.cc:40
#define HT_NOTICEF(msg,...)
Definition: Logger.h:280
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::chrono::steady_clock::time_point m_last_control_file_check
Definition: RangeServer.h:341
bool test_and_set_get_statistics_outstanding(bool value)
Performs a "test and set" operation on m_get_statistics_outstanding.
Definition: RangeServer.h:251
uint32_t count
Count of serialized key/value pairs in buffer.
Definition: UpdateRequest.h:59
std::vector< UpdateRequest * > requests
Vector of corresponding client requests.
Declarations for Status.
uint64_t disk_bytes_read
Definition: StatsTable.h:83
uint64_t memory_allocated
Definition: StatsTable.h:89
Declarations for MetaLogEntityRange.
std::map< String, SchemaPtr > TableSchemaMap
Definition: RangeServer.h:230
int64_t bytes_scanned
Number of bytes scanned while executing scan.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
static constexpr const char * SERVER_IS_SHUTTING_DOWN
Definition: Status.h:62
std::vector< AccessGroupSpec * > AccessGroupSpecs
Vector of AccessGroupSpec pointers.
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Filesystem::Flags convert(std::string str)
Converts string mnemonic to corresponding Filesystem::Flags value.
Definition: Filesystem.cc:180
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
void group_commit_add(EventPtr &event, uint64_t cluster_id, SchemaPtr &schema, const TableIdentifier &table, uint32_t count, StaticBuffer &buffer, uint32_t flags)
Defines a time window.
Definition: TimeWindow.h:61
static const char * METADATA_NAME
Holds updates destined for a specific table.
void metadata_sync(ResponseCallback *, const char *, uint32_t flags, std::vector< const char * > columns)
const char * cache_key() const
Definition: ScanSpec.h:139
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
const char * column_qualifier
Definition: KeySpec.h:128
Column family specification.
static int64_t memory_limit_ensure_unused_current
Definition: Global.h:105
const char * column_qualifier
Definition: Cell.h:68
static void clear_cache()
Clears both value and qualifier caches.
static int64_t memory_limit
Definition: Global.h:99
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
Declarations for MetaLog::Definition.
uint64_t bloom_filter_memory
Definition: StatsTable.h:92
Filesystem::Flags m_log_flush_method_user
Flush method for USER commit log updates.
Definition: RangeServer.h:275
const uint64_t MiB
Definition: Properties.h:157
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_ABORT
Definition: Logger.h:175
#define HT_INFO(msg)
Definition: Logger.h:271
LockStatus
Lock status.
Definition: LockSequencer.h:56
STL namespace.
std::string specs_to_string(const std::vector< Spec > &specs)
Returns a textual representation of variable specifications.
void set_current_fragment(uint32_t fragment_id)
Definition: ReplayBuffer.h:53
bool live(const vector< QualifiedRangeSpec > &ranges)
Hyperspace::SessionPtr m_hyperspace
Definition: RangeServer.h:301
Filesystem::Flags m_log_flush_method_meta
Flush method for METADATA commit log updates.
Definition: RangeServer.h:272
Declarations for RangeServer.
void initialize(PropertiesPtr &)
Definition: RangeServer.cc:486
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
Definition: Random.cc:55
#define HT_ON_SCOPE_EXIT(...)
Definition: ScopeGuard.h:301
ClockT::time_point expire_time
Request expiration time.
void table_maintenance_enable(ResponseCallback *cb, const TableIdentifier &table)
Enables maintenance for a table.
size_t column_qualifier_len
Definition: KeySpec.h:129
void phantom_load(ResponseCallback *, const String &location, int32_t plan_generation, const vector< int32_t > &fragments, const vector< QualifiedRangeSpec > &specs, const vector< RangeState > &states)
bool expired()
Returns true if the timer is expired.
Definition: Timer.h:112
const void * row
Definition: KeySpec.h:125
Declarations for MetaLogEntityRemoveOkLogs.
void wait_for_maintenance(ResponseCallback *cb)
Blocks while the maintenance queue is non-empty.
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
Definition: IndexTables.cc:34
void set(Code code, const std::string &text)
Sets status code and text.
Definition: Status.h:101
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
uint32_t update_count
Update count.
static void add_to_work_queue(MetaLog::EntityTaskPtr entity)
Definition: Global.cc:84
MetaLog entity to track transfer logs that can be safely removed
void table_maintenance_disable(ResponseCallback *cb, const TableIdentifier &table)
Disables maintenance for a table.
Code
Enumeration for status codes.
Definition: Status.h:47
static MetaLogEntityRemoveOkLogsPtr remove_ok_logs
Definition: Global.h:71
void reset(bool start_timer=false)
Resets the timer.
Definition: Timer.h:89
Tracks range server memory used.
Definition: MemoryTracker.h:42
void destroy_scanner(ResponseCallback *cb, int32_t scanner_id)
void get(Cells &cells)
Definition: Cells.h:97
void replay_log(TableInfoMap &replay_map, CommitLogReaderPtr &log_reader)
std::shared_ptr< CommitLogReader > CommitLogReaderPtr
Smart pointer to CommitLogReader.
void md5_trunc_modified_base64(const char *input, char output[17])
Get the modified base64 encoded string of the first 12 Bytes of the 16 Byte MD5 code of a null termin...
Definition: md5.cc:425
uint32_t update_cells
Cells updated.
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Wrapper for TableIdentifier providing member storage.
void set_state(ResponseCallback *cb, const std::vector< SystemVariable::Spec > &specs, int64_t generation)
void get_ranges(Ranges &ranges, StringSet *remove_ok_logs=0)
Gets set of live RangeData objects and corresponding transfer logs that can be safely removed...
int response(Hypertable::Status &status)
Sends response parameters back to client.
Definition: Status.cc:41
Scan predicate and control specification.
Definition: ScanSpec.h:56
Open file for locking.
Definition: Session.h:75
Declarations for RangeServerProtocol.
Response callback for status function.
Definition: Status.h:46
void commit_log_sync(ResponseCallback *cb, uint64_t cluster_id, const TableIdentifier &table)
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
Definition: TableMutator.h:257
static int64_t memory_limit_ensure_unused
Definition: Global.h:102
StaticBuffer buffer
Update buffer containing serialized key/value pairs.
Definition: UpdateRequest.h:57
Hash key to query cache.
Definition: QueryCache.h:54
static int64_t log_prune_threshold_max
Definition: Global.h:96
File system utility functions.
bool m_startup
Flag indicating if server is starting up.
Definition: RangeServer.h:286
static Hypertable::MemoryTracker * memory_tracker
Definition: Global.h:94
const char * end_row
Definition: RangeSpec.h:60
PropertiesPtr m_props
Configuration properties.
Definition: RangeServer.h:280
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
static Hyperspace::SessionPtr hyperspace
Definition: Global.h:63
StatsRangeServerPtr m_stats
Definition: RangeServer.h:309
MetaLog entity for range state persisted in RSML.
Open file for writing.
Definition: Session.h:73
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
EventPtr event
Event object of originating update requst.
Definition: UpdateRequest.h:61
static std::string toplevel_dir
Definition: Global.h:108
uint64_t cached_bytes_returned
Cached bytes returned.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
uint64_t update_bytes
Bytes updated.
Encapsulate an internet address.
Definition: InetAddr.h:66
std::shared_ptr< MergeScannerRange > MergeScannerRangePtr
Smart pointer to MergeScannerRange.
Declarations for ScanContext.
Declarations for MaintenanceScheduler.
ByteArena arena
Memory arena.
Definition: TableInfo.h:87
TimerHandlerPtr m_timer_handler
Smart pointer to timer handler.
Definition: RangeServer.h:325
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:233
String get_transfer_log()
Gets transfer log.
std::vector< RangeData > array
Vector of RangeData objects.
Definition: TableInfo.h:85
static Hypertable::Lib::Master::ClientPtr master_client
Definition: Global.h:68
int64_t disk_read
Number of bytes read from disk while executing scan.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
static CommitLogPtr root_log
Definition: Global.h:80
void batch_update(std::vector< UpdateRecTable * > &updates, ClockT::time_point expire_time)
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
Definition: CommAddress.h:76
void drop_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &)
static TablePtr rs_metrics_table
Definition: Global.h:92
static uint64_t get()
Gets the cluster ID.
Definition: ClusterId.h:85
void acknowledge_load(Response::Callback::AcknowledgeLoad *cb, const vector< QualifiedRangeSpec > &specs)
static MetaLog::WriterPtr rsml_writer
Definition: Global.h:81
void close_handle_ptr(SessionPtr hyperspace, uint64_t *handlep)
Definition: Session.cc:1400
void dump_pseudo_table(ResponseCallback *cb, const TableIdentifier &table, const char *pseudo_table, const char *outfile)
Compatibility Macros for C/C++.
std::string table_id
Definition: StatsTable.h:70
const char * transfer_log
Full pathname of transfer log.
Definition: RangeState.h:111
uint32_t cells_returned
Cells returned.
static PseudoTables * instance()
Creates and/or returns singleton instance of the PseudoTables class.
Definition: PseudoTables.h:64
int64_t period_millis
Time period over which stats are computed.
ConnectionManagerPtr m_conn_manager
Definition: RangeServer.h:295
static int64_t log_prune_threshold_min
Definition: Global.h:95
std::shared_ptr< ConnectionHandler > m_master_connection_handler
Definition: RangeServer.h:299
const char * row_key
Definition: Cell.h:66
Declarations for HyperspaceTableCache.
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
#define HT_END
Definition: Logger.h:220
std::shared_ptr< TableInfoMap > TableInfoMapPtr
Shared smart pointer to TableInfoMap.
Definition: TableInfoMap.h:223
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
void dump_garbage_tracker_statistics(std::ofstream &out)
Prints human-readable representation of garbage tracker state to an output stream.
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
void load_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &, const RangeState &, bool needs_compaction)
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
void add(const TableIdentifier &table, SerializedKey &key, ByteString &value)
Definition: ReplayBuffer.cc:57
void verify_schema(TableInfoPtr &, uint32_t generation, const TableSchemaMap *table_schemas=0)
static int32_t get_drive_count()
Returns the number of drives.
Definition: System.cc:111
TableInfoPtr table_info
TableInfo object for destination table.
void phantom_prepare_ranges(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges)
Declarations for IndexUpdater.
#define HT_ERROR_OUT
Definition: Logger.h:301
static Hypertable::RangeLocatorPtr range_locator
Definition: Global.h:69
void decode_table_id(const uint8_t **bufp, size_t *remainp, TableIdentifier *tid)
Definition: RangeServer.cc:957
static int64_t cellstore_target_size_min
Definition: Global.h:97
uint64_t shadow_cache_memory
Definition: StatsTable.h:90
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
NameIdMapperPtr m_namemap
Table name-to-ID mapper
Definition: RangeServer.h:319
static int64_t range_split_size
Definition: Global.h:84
Lib::Master::ClientPtr m_master_client
Definition: RangeServer.h:300
Context record for update request passed into UpdatePipeline.
Definition: UpdateContext.h:38
bool has(const String &name) const
Check whether a sub-property exists.
Definition: Properties.h:515
A structure to retrieve memory statistics (RAM size, used size, free size etc)
Definition: SystemInfo.h:103
#define HT_WARN_OUT
Definition: Logger.h:291
Hypertable library.
Definition: CellInterval.h:30
This class is used to generate and deliver standard responses back to a client.
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
static Schema * new_instance(const std::string &buf)
Creates schema object from XML schema string.
Definition: Schema.cc:202
Lock exclusive mode.
Definition: LockSequencer.h:51
TableIdentifier id
Table identifier for destination table.
#define HT_FATALF(msg,...)
Definition: Logger.h:343
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
const std::string & get_name() const
Gets column family name.
static Hypertable::MaintenanceQueuePtr maintenance_queue
Definition: Global.h:67
std::shared_ptr< Reader > ReaderPtr
Smart pointer to Reader.
void replay_load_range(TableInfoMap &replay_map, MetaLogEntityRangePtr &range_entity)
Definition: RangeServer.cc:976
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Declarations for MetaLog::Writer.
static bool ignore_clock_skew_errors
Definition: Global.h:111
static std::string log_dir
Definition: Global.h:82
static void get(Status &status)
Gets persistent status.
void legacy_decode(const uint8_t **bufp, size_t *remainp, BalancePlan *plan)
void compact(ResponseCallback *, const TableIdentifier &, const char *row, int32_t flags)
ColumnFamilySpec * get_column_family(const std::string &name)
Gets a column family specification given its name.
Definition: Schema.h:262
Helper class to access parts of the properties.
Definition: Properties.h:458
std::shared_ptr< MetaLogEntityRange > MetaLogEntityRangePtr
Smart pointer to MetaLogEntityRange.
bool empty()
Determines if map is empty.
const char * start_row
Definition: RangeSpec.h:59
static CommitLogPtr system_log
Definition: Global.h:78
void drop_table(ResponseCallback *cb, const TableIdentifier &table)
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
static int32_t metrics_interval
Definition: Global.h:109
void get(const String &table_id, TableInfoPtr &info)
Gets the TableInfo object for a table, creating one if not found.
Definition: TableInfoMap.cc:56
std::shared_ptr< TableInfo > TableInfoPtr
Smart pointer to TableInfo.
Definition: TableInfo.h:312
Split - range shrunk.
Definition: RangeState.h:55
ApplicationQueuePtr m_app_queue
Definition: RangeServer.h:296
uint32_t cached_cells_returned
Cached cells returned.
Declarations for MaintenanceQueue This file contains the type declarations for the MaintenanceQueue...
MetricsCollectorGangliaPtr m_ganglia_collector
Ganglia metrics collector.
Definition: RangeServer.h:348
const char * column_family
Definition: Cell.h:67
#define HT_INFOF(msg,...)
Definition: Logger.h:272
static int32_t failover_timeout
Definition: Global.h:86
String name
File or directory name.
Definition: Filesystem.h:96
Schema * cellstore_index
Schema of cellstore.index pseudo table.
Definition: PseudoTables.h:83
uint32_t cells_scanned
Cells scanned.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
std::shared_ptr< Range > RangePtr
Smart pointer to Range.
Definition: Range.h:404
AccessGroup::MaintenanceData * agdata
Definition: Range.h:81
Random number generator for int32, int64, double and ascii arrays.
static int64_t range_metadata_split_size
Definition: Global.h:93
uint8_t * base
Pointer to the allocated memory buffer.
void get(Code *code, std::string &text) const
Gets status code and text.
Definition: Status.h:111
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
#define HT_FAILURE_SIGNALLED(_label_)
std::shared_ptr< CommitLog > CommitLogPtr
Smart pointer to CommitLog.
Definition: CommitLog.h:223
bool FillScanBlock(MergeScannerRangePtr &scanner, DynamicBuffer &dbuf, uint32_t *cell_count, int64_t buffer_size)
Fills a block of scan results to be sent back to client.
static int64_t cellstore_target_size_max
Definition: Global.h:98
bool lookup(const String &table_id, TableInfoPtr &info)
Returns the TableInfo object for a given table.
Definition: TableInfoMap.cc:47
static int32_t access_group_max_mem
Definition: Global.h:88
std::shared_ptr< PhantomRange > PhantomRangePtr
Shared smart pointer to PhantomRange.
Definition: PhantomRange.h:120
void replay_fragments(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, int32_t type, const vector< int32_t > &fragments, const RangeServerRecovery::ReceiverPlan &receiver_plan, int32_t replay_timeout)
RowIntervals row_intervals
Definition: ScanSpec.h:275
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
static String install_dir
The installation directory.
Definition: System.h:114
This is a generic exception class for Hypertable.
Definition: Error.h:314
Holds client update request and error state.
Definition: UpdateRequest.h:54
std::shared_ptr< MetaLogEntityRemoveOkLogs > MetaLogEntityRemoveOkLogsPtr
Smart pointer to MetaLogEntityRemoveOkLogs.
void get_statistics(Response::Callback::GetStatistics *cb, const std::vector< SystemVariable::Spec > &specs, uint64_t generation)
static std::mutex mutex
Definition: Global.h:62
void get_stats(uint64_t *max_memoryp, uint64_t *available_memoryp, uint64_t *accessesp, uint64_t *hitsp)
static int32_t cell_cache_scanner_cache_size
Definition: Global.h:89
void enable_window(bool enable)
Enables or disables this time window.
Definition: TimeWindow.h:97
void phantom_update(Response::Callback::PhantomUpdate *, const String &location, int32_t plan_generation, const QualifiedRangeSpec &range, int32_t fragment, EventPtr &event)
static bool range_initialization_complete
Definition: Global.h:112
static const char * END_ROOT_ROW
Definition: Key.h:50
static CommitLogPtr user_log
Definition: Global.h:77
MaintenanceSchedulerPtr m_maintenance_scheduler
Smart pointer to maintenance scheduler.
Definition: RangeServer.h:322
void clear()
Clears the map.
virtual bool get(Key &key, ByteString &value)=0
#define HT_MAYBE_FAIL_X(_label_, _exp_)
static LoadStatisticsPtr load_statistics
Definition: Global.h:72
Declarations for StatusPersister.
static Hypertable::FilesystemPtr log_dfs
Definition: Global.h:65
Qualified (with table identifier) range specification.
uint32_t value_len
Definition: Cell.h:72
std::shared_ptr< RangeServer > RangeServerPtr
Shared smart pointer to RangeServer.
Definition: RangeServer.h:355
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void status(Response::Callback::Status *cb)
Definition: RangeServer.cc:375
Declarations for CommitLog.
void dump(ResponseCallback *, const char *, bool)
static Hypertable::PseudoTables * pseudo_tables
Definition: Global.h:70
Create file if it does not exist.
Definition: Session.h:77
void phantom_commit_ranges(ResponseCallback *, int64_t op_id, const String &location, int32_t plan_generation, const vector< QualifiedRangeSpec > &ranges)
static std::string type_str(int type)
Definition: RangeSpec.cc:41
#define HT_ON_OBJ_SCOPE_EXIT(...)
Definition: ScopeGuard.h:305
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
#define HT_NOTICE(msg)
Definition: Logger.h:279
static bool ignore_cells_with_clock_skew
Definition: Global.h:76
void update(Response::Callback::Update *cb, uint64_t cluster_id, const TableIdentifier &table, uint32_t count, StaticBuffer &buffer, uint32_t flags)
Inserts data into a table.
uint64_t bytes_returned
Bytes returned.
uint64_t bloom_filter_maybes
Definition: StatsTable.h:94
Range state.
Definition: RangeState.h:48
static TimeWindow low_activity_time
Definition: Global.h:116
void get_locations(StringSet &locations) const
Definition: ReceiverPlan.cc:58
int64_t cells_scanned
Number of cell scanned while executing scan.
uint64_t bloom_filter_accesses
Definition: StatsTable.h:93
CellIntervals cell_intervals
Definition: ScanSpec.h:276
int64_t cells_returned
Number of cell returned while executing scan.
Declarations for LocationInitializer.
#define HT_MAYBE_FAIL(_label_)
Open file for reading.
Definition: Session.h:71
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
void dump_keys(std::ofstream &out)
#define HT_WARN(msg)
Definition: Logger.h:289
void relinquish_range(ResponseCallback *, const TableIdentifier &, const RangeSpec &)
Encapsulates decomposed key and value.
Definition: Cell.h:32
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
static Hypertable::ApplicationQueuePtr app_queue
Definition: Global.h:66
System information and statistics based on libsigar.
String extensions and helpers: sets, maps, append operators etc.
Holds pointers to a Range and associated Range::MaintenanceData.
Definition: TableInfo.h:55
static const MemStat & mem_stat()
Retrieves updated Memory statistics (see SystemInfo.h)
Definition: SystemInfo.cc:339
const char * column_family
Definition: KeySpec.h:127
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
const char * version_string()
Definition: Version.cc:37
static RangesPtr get_ranges()
Definition: Global.cc:117
EventPtr & event()
Get smart pointer to event object that triggered the request.
Manages live range map and set of log names that can be safely removed.
Definition: TableInfoMap.h:67
md5 digest routines.
Wrapper for RangeSpec providing member storage.
Definition: RangeSpec.h:89
#define HT_FATAL_OUT
Definition: Logger.h:347
void create_scanner(Response::Callback::CreateScanner *, const TableIdentifier &, const RangeSpec &, const ScanSpec &, QueryCache::Key *)
Range::MaintenanceData * data
Pointer to maintenance data for range.
Definition: TableInfo.h:66
static constexpr const char * SERVER_IS_COMING_UP
Definition: Status.h:61
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
Declarations for ClusterId.
static Hypertable::FileBlockCache * block_cache
Definition: Global.h:90
Declarations for Client.
static bool row_size_unlimited
Definition: Global.h:75
int64_t balance()
Return total range server memory used.
Definition: MemoryTracker.h:70
Declaration for FillScanBlock.
int64_t bytes_returned
Number of bytes returned while executing scan.
Holds vector of RangeData objects and memory arena.
Definition: TableInfo.h:72
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
int response(const std::map< QualifiedRangeSpec, int32_t > &error_map)
void fetch_scanblock(Response::Callback::CreateScanner *, int32_t scanner_id)
#define HT_DEBUG_OUT
Definition: Logger.h:261
Declarations for MergeScannerRange.
const uint8_t * value
Definition: Cell.h:71
std::shared_ptr< Definition > DefinitionPtr
Smart pointer to Definition.
static int64_t range_maximum_size
Definition: Global.h:85
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
static CommitLogPtr metadata_log
Definition: Global.h:79
int code() const
Returns the error code.
Definition: Error.h:391
std::shared_ptr< ScanContext > ScanContextPtr
Definition: ScanContext.h:169
RangePtr range
Pointer to Range.
Definition: TableInfo.h:63
LogReplayBarrierPtr m_log_replay_barrier
Definition: RangeServer.h:260
void heapcheck(ResponseCallback *, const char *)
Executes user-defined functions when leaving the current scope.
static bool encountered_upgrade
Flag indicating that old entity was encountered.
std::map< const char *, int32_t, LtCstr > CstrToInt32Map
STL map from c-style string to int32_t.
Definition: StringExt.h:55
std::shared_ptr< Ranges > RangesPtr
Smart pointer to Ranges.
Definition: TableInfo.h:91
RangeServer recovery receiver plan.
Definition: ReceiverPlan.h:48
void update_schema(ResponseCallback *, const TableIdentifier &, const char *)