0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Context.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "BalancePlanAuthority.h"
31 #include "Context.h"
32 #include "LoadBalancer.h"
33 #include "Operation.h"
34 #include "OperationProcessor.h"
35 #include "OperationRecover.h"
36 #include "OperationTimedBarrier.h"
37 #include "ReferenceManager.h"
38 
43 
44 #include <FsBroker/Lib/Client.h>
45 
46 #include <Common/FailureInducer.h>
47 #include <Common/SystemInfo.h>
48 #include <Common/md5.h>
49 
50 #include <chrono>
51 #include <memory>
52 
53 using namespace Hypertable;
54 using namespace Hypertable::Lib;
55 using namespace std;
56 
57 Context::Context(PropertiesPtr &p, Hyperspace::SessionPtr hs) : props(p), hyperspace(hs), op(nullptr) {
58 
59  if (props->has("Hypertable.Cluster.Name")) {
60  cluster_name = props->get_str("Hypertable.Cluster.Name");
61  boost::trim_if(cluster_name, boost::is_any_of(" '\""));
62  }
63 
64  disk_threshold = props->get_i32("Hypertable.Master.DiskThreshold.Percentage");
65 
66  toplevel_dir = props->get_str("Hypertable.Directory");
67  boost::trim_if(toplevel_dir, boost::is_any_of("/"));
69 
70  comm = Comm::instance();
71  conn_manager = make_shared<ConnectionManager>(comm);
72  reference_manager = make_unique<ReferenceManager>();
73  response_manager = make_unique<ResponseManager>();
74  response_manager_thread = make_unique<Thread>(*response_manager);
75  dfs = std::make_shared<FsBroker::Lib::Client>(conn_manager, props);
76  rsc_manager = make_shared<RangeServerConnectionManager>();
77  metrics_handler = std::make_shared<MetricsHandler>(props);
78  metrics_handler->start_collecting();
79 
80  int worker_count = props->get_i32("Hypertable.Client.Workers");
81  app_queue = make_shared<ApplicationQueue>(worker_count);
82 
83  if (hyperspace) {
84  namemap = make_shared<NameIdMapper>(hyperspace, toplevel_dir);
85  master_file = make_unique<HyperspaceMasterFile>(props, hyperspace);
86  }
87 
88  request_timeout = (time_t)(props->get_i32("Hypertable.Request.Timeout") / 1000);
89  if (props->get_bool("Hypertable.Master.Locations.IncludeMasterHash")) {
90  char hash[33];
91  uint16_t port = props->get_i16("port");
92  md5_string(format("%s:%u", System::net_info().host_name.c_str(), port).c_str(), hash);
93  location_hash = String(hash).substr(0, 8);
94  }
95  max_allowable_skew = props->get_i32("Hypertable.RangeServer.ClockSkew.Max");
96  monitoring_interval = props->get_i32("Hypertable.Monitoring.Interval");
97  gc_interval = props->get_i32("Hypertable.Master.Gc.Interval");
99 
101  HT_ASSERT(gc_interval > 1000);
102 
103  time_t now = time(0);
104  next_monitoring_time = now + (monitoring_interval/1000) - 1;
105  next_gc_time = now + (gc_interval/1000) - 1;
106 }
107 
108 
110  if (mml_writer)
111  mml_writer->close();
112  metrics_handler->stop_collecting();
113  delete balancer;
114 }
115 
117  {
118  lock_guard<std::mutex> lock(mutex);
119  m_startup = status;
120  if (status == false && m_shutdown)
121  return false;
122  }
123  return true;
124 }
125 
127  return m_startup;
128 }
129 
131  {
132  lock_guard<std::mutex> lock(mutex);
133  if (m_shutdown)
134  return;
135  m_shutdown = true;
136  if (m_startup)
137  return;
138  }
139  metrics_handler->stop_collecting();
140  master_file->shutdown();
142  recovery_barrier_op->shutdown();
143  op->wait_for_idle(chrono::seconds(15));
144  op->shutdown();
145  response_manager->shutdown();
146  response_manager_thread->join();
147 }
148 
150  return m_shutdown;
151 }
152 
153 
154 
155 
156 void Context::notification_hook(const String &subject, const String &message) {
157 
158  String cmd;
159 
160  if (!cluster_name.empty())
161  cmd = format("%s/conf/notification-hook.sh '[Hypertable %s] %s' '%s'",
162  System::install_dir.c_str(), cluster_name.c_str(),
163  subject.c_str(), message.c_str());
164  else
165  cmd = format("%s/conf/notification-hook.sh '[Hypertable] %s' '%s'",
166  System::install_dir.c_str(), subject.c_str(),
167  message.c_str());
168 
169  int ret = ::system(cmd.c_str());
170  HT_INFOF("notification-hook returned: %d", ret);
171  if (ret != 0) {
172  HT_WARNF("shell script conf/notification-hook.sh ('%s') returned "
173  "error %d", cmd.c_str(), ret);
174  }
175 }
176 
178  lock_guard<std::mutex> lock(mutex);
179  HT_ASSERT(dynamic_cast<BalancePlanAuthority *>(bpa.get()));
181 }
182 
184  lock_guard<std::mutex> lock(mutex);
186  m_balance_plan_authority = make_shared<BalancePlanAuthority>(shared_from_this(), mml_writer);
187  return static_cast<BalancePlanAuthority *>(m_balance_plan_authority.get());
188 }
189 
191  lock_guard<std::mutex> lock(mutex);
193  m_balance_plan_authority = make_shared<BalancePlanAuthority>(shared_from_this(), mml_writer);
194  entity = m_balance_plan_authority;
195 }
196 
197 TablePtr Context::new_table(const std::string &name) {
198  if (!range_locator)
199  range_locator = make_shared<RangeLocator>(props, conn_manager, hyperspace, request_timeout * 1000);
201  return make_shared<Table>(props, range_locator, conn_manager,
202  hyperspace, aq, namemap, name);
203 }
204 
207  const uint8_t *decode_ptr = event->payload;
208  size_t decode_remain = event->payload_len;
209 
210  params.decode(&decode_ptr, &decode_remain);
211 
212  String proxy;
213  if (event->proxy == 0) {
215  if (!rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
216  HT_WARNF("Unable to determine proxy for replay_status(id=%lld, %s, "
217  "plan_generation=%d) from %s", (Lld)params.op_id(),
218  params.location().c_str(), params.plan_generation(),
219  event->addr.format().c_str());
220  return;
221  }
222  proxy = rsc->location();
223  }
224  else
225  proxy = event->proxy;
226 
227  HT_INFOF("replay_status(id=%lld, %s, plan_generation=%d) from %s",
228  (Lld)params.op_id(), params.location().c_str(),
229  params.plan_generation(), proxy.c_str());
230 
232 
233  if (future)
234  future->status(proxy, params.plan_generation());
235  else
236  HT_WARN_OUT << "No Recovery replay step future found for operation=" << params.op_id() << HT_END;
237 
238 }
239 
242  const uint8_t *decode_ptr = event->payload;
243  size_t decode_remain = event->payload_len;
244 
245  params.decode(&decode_ptr, &decode_remain);
246 
247  String proxy;
248  if (event->proxy == 0) {
250  if (!rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
251  HT_WARNF("Unable to determine proxy for replay_complete(id=%lld, %s, "
252  "plan_generation=%d) from %s", (Lld)params.op_id(),
253  params.location().c_str(), params.plan_generation(),
254  event->addr.format().c_str());
255  return;
256  }
257  proxy = rsc->location();
258  }
259  else
260  proxy = event->proxy;
261 
262  HT_INFOF("from %s replay_complete(id=%lld, %s, plan_generation=%d) = %s",
263  proxy.c_str(), (Lld)params.op_id(), params.location().c_str(),
264  params.plan_generation(), Error::get_text(params.error()));
265 
266  RecoveryStepFuturePtr future
268 
269  if (future) {
270  if (params.error() == Error::OK)
271  future->success(proxy, params.plan_generation());
272  else
273  future->failure(proxy, params.plan_generation(), params.error(),
274  params.message());
275  }
276  else
277  HT_WARN_OUT << "No Recovery replay step future found for operation="
278  << params.op_id() << HT_END;
279 
280 }
281 
284  const uint8_t *decode_ptr = event->payload;
285  size_t decode_remain = event->payload_len;
286 
287  params.decode(&decode_ptr, &decode_remain);
288 
289  HT_INFOF("prepare_complete(id=%lld, %s, plan_generation=%d) = %s",
290  (Lld)params.op_id(), params.location().c_str(),
291  params.plan_generation(), Error::get_text(params.error()));
292 
293  RecoveryStepFuturePtr future
295 
296  String proxy;
297  if (event->proxy == 0) {
299  if (!rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
300  HT_WARNF("Unable to determine proxy for prepare_complete(id=%lld, %s, "
301  "plan_generation=%d) from %s", (Lld)params.op_id(),
302  params.location().c_str(), params.plan_generation(),
303  event->addr.format().c_str());
304  return;
305  }
306  proxy = rsc->location();
307  }
308  else
309  proxy = event->proxy;
310 
311  if (future) {
312  if (params.error() == Error::OK)
313  future->success(proxy, params.plan_generation());
314  else
315  future->failure(proxy, params.plan_generation(), params.error(),
316  params.message());
317  }
318  else
319  HT_WARN_OUT << "No Recovery prepare step future found for operation="
320  << params.op_id() << HT_END;
321 }
322 
325  const uint8_t *decode_ptr = event->payload;
326  size_t decode_remain = event->payload_len;
327 
328  params.decode(&decode_ptr, &decode_remain);
329 
330  HT_INFOF("commit_complete(id=%lld, %s, plan_generation=%d) = %s",
331  (Lld)params.op_id(), params.location().c_str(),
332  params.plan_generation(), Error::get_text(params.error()));
333 
334  RecoveryStepFuturePtr future
336 
337  String proxy;
338  if (event->proxy == 0) {
340  if (!rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
341  HT_WARNF("Unable to determine proxy for commit_complete(id=%lld, %s, "
342  "plan_generation=%d) from %s", (Lld)params.op_id(),
343  params.location().c_str(), params.plan_generation(),
344  event->addr.format().c_str());
345  return;
346  }
347  proxy = rsc->location();
348  }
349  else
350  proxy = event->proxy;
351 
352  if (future) {
353  if (params.error() == Error::OK)
354  future->success(proxy, params.plan_generation());
355  else
356  future->failure(proxy, params.plan_generation(), params.error(),
357  params.message());
358  }
359  else
360  HT_WARN_OUT << "No Recovery commit step future found for operation="
361  << params.op_id() << HT_END;
362 }
363 
365  lock_guard<std::mutex> lock(m_outstanding_move_ops_mutex);
366  auto iter = m_outstanding_move_ops.find(operation->hash_code());
367  if (iter != m_outstanding_move_ops.end())
368  return false;
369  m_outstanding_move_ops[operation->hash_code()] = operation->id();
370  return true;
371 }
372 
374  lock_guard<std::mutex> lock(m_outstanding_move_ops_mutex);
375  auto iter = m_outstanding_move_ops.find(operation->hash_code());
376  if (iter == m_outstanding_move_ops.end())
377  return;
378  m_outstanding_move_ops.erase(iter);
379 }
380 
382  lock_guard<std::mutex> lock(m_outstanding_move_ops_mutex);
383  auto iter = m_outstanding_move_ops.find(hash_code);
384  if (iter != m_outstanding_move_ops.end()) {
385  OperationPtr operation = reference_manager->get(iter->second);
386  HT_ASSERT(operation);
387  return operation;
388  }
389  return OperationPtr();
390 }
391 
392 void Context::add_available_server(const String &location) {
393  lock_guard<std::mutex> lock(mutex);
394  available_servers.insert(location);
395 }
396 
398  lock_guard<std::mutex> lock(mutex);
399  available_servers.erase(location);
400 }
401 
403  lock_guard<std::mutex> lock(mutex);
404  return available_servers.size();
405 }
406 
408  lock_guard<std::mutex> lock(mutex);
409  servers = available_servers;
410 }
411 
412 
414 {
415 
416  // system info was not yet initialized; assume that the disks are available
417  if (!stats.stats) {
418  HT_WARNF("RangeServer %s: no disk usage statistics available",
419  stats.location.c_str());
420  return true;
421  }
422 
423  // accept new ranges if there's at least one disk below the threshold
424  double numerator=0.0, denominator=0.0;
425  for (const auto &fs : stats.stats->system.fs_stat) {
426  numerator += fs.total - fs.avail;
427  denominator += fs.total;
428  }
429 
430  if (denominator == 0.0 ||
431  (int32_t)((numerator/denominator)*100.00) < disk_threshold)
432  return true;
433 
434  HT_WARNF("RangeServer %s: disk use %d%% exceeds threshold; will not assign "
435  "ranges", stats.location.c_str(),
436  (int32_t)((numerator/denominator)*100.00));
437 
438  return false;
439 }
440 
441 
442 void
444  RecoveryStepFuturePtr &future) {
445  lock_guard<std::mutex> lock(m_mutex);
446  m_replay_map[id] = future;
447 }
448 
451  lock_guard<std::mutex> lock(m_mutex);
452  RecoveryStepFuturePtr future;
453  if (m_replay_map.find(id) != m_replay_map.end())
454  future = m_replay_map[id];
455  return future;
456 }
457 
458 void
460  lock_guard<std::mutex> lock(m_mutex);
461  m_replay_map.erase(id);
462 }
463 
464 
465 void
467  RecoveryStepFuturePtr &future) {
468  lock_guard<std::mutex> lock(m_mutex);
469  m_prepare_map[id] = future;
470 }
471 
474  lock_guard<std::mutex> lock(m_mutex);
475  RecoveryStepFuturePtr future;
476  if (m_prepare_map.find(id) != m_prepare_map.end())
477  future = m_prepare_map[id];
478  return future;
479 }
480 
481 void
483  lock_guard<std::mutex> lock(m_mutex);
484  m_prepare_map.erase(id);
485 }
486 
487 
488 void
490  RecoveryStepFuturePtr &future) {
491  lock_guard<std::mutex> lock(m_mutex);
492  m_commit_map[id] = future;
493 }
494 
497  lock_guard<std::mutex> lock(m_mutex);
498  RecoveryStepFuturePtr future;
499  if (m_commit_map.find(id) != m_commit_map.end())
500  future = m_commit_map[id];
501  return future;
502 }
503 
504 void
506  lock_guard<std::mutex> lock(m_mutex);
507  m_commit_map.erase(id);
508 }
509 
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
String cluster_name
Name of cluster.
Definition: Context.h:176
StringSet available_servers
Definition: Context.h:146
ApplicationQueuePtr app_queue
Definition: Context.h:151
void install_prepare_future(int64_t id, RecoveryStepFuturePtr &future)
Definition: Context.cc:466
#define HT_WARNF(msg,...)
Definition: Logger.h:290
The FailureInducer simulates errors.
bool add_move_operation(std::shared_ptr< Operation > operation)
Adds operation to active move range operation map.
Definition: Context.cc:364
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
int32_t plan_generation()
Gets recovery plan generation.
Definition: ReplayStatus.h:74
const string & location() const
Gets proxy name of RangeServer whose log is being replayed.
RecoveryStepFuturePtr get_commit_future(int64_t id)
Definition: Context.cc:496
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
std::mutex m_outstanding_move_ops_mutex
Mutex for serializing access to m_outstanding_move_ops
Definition: Context.h:252
Declarations for OperationProcessor.
time_t request_timeout
Definition: Context.h:168
Declarations for Operation.
ConnectionManagerPtr conn_manager
Definition: Context.h:148
LoadBalancer * balancer
Definition: Context.h:157
int32_t max_allowable_skew
Definition: Context.h:179
void remove_move_operation(std::shared_ptr< Operation > operation)
Removes operation from active move range operation map.
Definition: Context.cc:373
bool set_startup_status(bool status)
Set startup flag to false.
Definition: Context.cc:116
TablePtr new_table(const std::string &name)
Definition: Context.cc:197
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
std::unique_ptr< ResponseManager > response_manager
Definition: Context.h:162
RecoveryState m_recovery_state
Definition: Context.h:246
std::unique_ptr< Thread > response_manager_thread
Definition: Context.h:161
void start_shutdown()
Start shutdown sequence.
Definition: Context.cc:130
void erase_prepare_future(int64_t id)
Definition: Context.cc:482
bool can_accept_ranges(const RangeServerStatistics &stats)
Definition: Context.cc:413
Declarations for ReferenceManager.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Request parameters for replay status operation.
Definition: ReplayStatus.h:46
Declarations for ReplayStatus request parameters.
int32_t disk_threshold
Disk use threshold percentage.
Definition: Context.h:178
Request parameters for replay complete operation.
PropertiesPtr props
Configuration properties.
Definition: Context.h:147
uint32_t monitoring_interval
Definition: Context.h:170
std::shared_ptr< RecoveryStepFuture > RecoveryStepFuturePtr
RangeLocatorPtr range_locator
Definition: Context.h:152
void erase_replay_future(int64_t id)
Definition: Context.cc:459
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
int32_t plan_generation()
Gets recovery plan generation.
std::unique_ptr< OperationProcessor > op
Definition: Context.h:174
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
const string & location() const
Gets proxy name of RangeServer whose log is being recovered.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
Hyperspace::SessionPtr hyperspace
Definition: Context.h:150
void prepare_complete(EventPtr &event)
Definition: Context.cc:282
const string & message() const
Gets error message.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Declarations for PhantomPrepareComplete request parameters.
Compatibility Macros for C/C++.
Declarations for PhantomCommitComplete request parameters.
void remove_available_server(const String &location)
Definition: Context.cc:397
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
String location_hash
Definition: Context.h:177
void replay_status(EventPtr &event)
Definition: Context.cc:205
String toplevel_dir
Definition: Context.h:153
void md5_string(const char *input, char output[33])
Calculates the hex string of MD5 of null terminated input.
Definition: md5.cc:384
FilesystemPtr dfs
Definition: Context.h:165
void install_replay_future(int64_t id, RecoveryStepFuturePtr &future)
Definition: Context.cc:443
std::unique_ptr< HyperspaceMasterFile > master_file
Hyperspace master file handle
Definition: Context.h:156
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
bool shutdown_in_progress()
Gets flag indicating if server is shutting down.
Definition: Context.cc:149
#define HT_WARN_OUT
Definition: Logger.h:291
Hypertable library.
Definition: CellInterval.h:30
Context(PropertiesPtr &p, Hyperspace::SessionPtr hs)
Context.
Definition: Context.cc:57
Hypertable definitions
RecoveryStepFuturePtr get_prepare_future(int64_t id)
Definition: Context.cc:473
std::shared_ptr< OperationTimedBarrier > recovery_barrier_op
Definition: Context.h:175
std::unordered_map< int64_t, int64_t > m_outstanding_move_ops
Map of outstanding move range operations.
Definition: Context.h:255
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
uint32_t timer_interval
Definition: Context.h:169
Central authority for balance plans.
MetaLog::WriterPtr mml_writer
Definition: Context.h:163
Request parameters for phantom prepare complete operation.
void set_balance_plan_authority(MetaLog::EntityPtr bpa)
Sets the BalancePlanAuthority.
Definition: Context.cc:177
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void install_commit_future(int64_t id, RecoveryStepFuturePtr &future)
Definition: Context.cc:489
Comm * comm
Comm layer.
Definition: Context.h:143
static String install_dir
The installation directory.
Definition: System.h:114
Declarations for ReplayComplete request parameters.
MetaLog::EntityPtr m_balance_plan_authority
BalancePlanAuthority entity.
Definition: Context.h:249
uint32_t gc_interval
Definition: Context.h:171
bool m_startup
Flag indicating that server is starting up.
Definition: Context.h:258
bool startup_in_progress()
Gets flag indicating if server is starting up.
Definition: Context.cc:126
std::unique_ptr< ReferenceManager > reference_manager
Definition: Context.h:160
NameIdMapperPtr namemap
Definition: Context.h:154
void erase_commit_future(int64_t id)
Definition: Context.cc:505
const string & location() const
Gets proxy name of RangeServer whose log is being replayed.
Definition: ReplayStatus.h:70
RecoveryStepFuturePtr get_replay_future(int64_t id)
Definition: Context.cc:450
void commit_complete(EventPtr &event)
Definition: Context.cc:323
std::mutex mutex
Definition: Context.h:141
Declarations for BalancePlanAuthority.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
int64_t op_id()
Gets recovery operation ID.
Definition: ReplayStatus.h:66
time_t next_monitoring_time
Definition: Context.h:172
System information and statistics based on libsigar.
size_t available_server_count()
Definition: Context.cc:402
void get_available_servers(StringSet &servers)
Definition: Context.cc:407
void replay_complete(EventPtr &event)
Definition: Context.cc:240
std::shared_ptr< Operation > get_move_operation(int64_t hash_code)
Gets operation from active move range operation map.
Definition: Context.cc:381
BalancePlanAuthority * get_balance_plan_authority()
Definition: Context.cc:183
void notification_hook(const String &subject, const String &message)
Invoke notification hook.
Definition: Context.cc:156
md5 digest routines.
Declarations for Context.
static const NetInfo & net_info()
Retrieves updated Network information (see SystemInfo.h)
Definition: SystemInfo.cc:360
MetricsHandlerPtr metrics_handler
Definition: Context.h:149
void add_available_server(const String &location)
Definition: Context.cc:392
Declarations for Client.
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
RangeServerConnectionManagerPtr rsc_manager
Definition: Context.h:145
time_t next_gc_time
Definition: Context.h:173
bool m_shutdown
Flag indicating that server is shutting down.
Definition: Context.h:261
~Context()
Destructor.
Definition: Context.cc:109