0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
main.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
47 
49 #include <Hypertable/Lib/Config.h>
51 
52 #include <AsyncComm/Comm.h>
53 
54 #include <Common/FailureInducer.h>
55 #include <Common/Init.h>
56 #include <Common/System.h>
57 
58 #include <sstream>
59 
60 extern "C" {
61 #include <poll.h>
62 }
63 
64 using namespace Hypertable;
65 using namespace Config;
66 using namespace std;
67 
68 namespace {
69 
70  struct AppPolicy : Config::Policy {
71  static void init_options() {
72  alias("port", "Hypertable.Master.Port");
73  alias("workers", "Hypertable.Master.Workers");
74  alias("reactors", "Hypertable.Master.Reactors");
75  }
76  };
77 
78  typedef Meta::list<GenericServerPolicy, FsClientPolicy,
80 
82  public:
83  HandlerFactory(ContextPtr &context) : m_context(context) {
84  m_handler = make_shared<ConnectionHandler>(m_context);
85  }
86 
87  virtual void get_instance(DispatchHandlerPtr &dhp) {
88  dhp = m_handler;
89  }
90 
91  void start_timer() {
92  static_cast<ConnectionHandler*>(m_handler.get())->start_timer();
93  }
94 
95  private:
96  ContextPtr m_context;
97  DispatchHandlerPtr m_handler;
98  };
99 
100  struct ltrsc {
101  bool operator()(const RangeServerConnectionPtr &rsc1, const RangeServerConnectionPtr &rsc2) const {
102  return strcmp(rsc1->location().c_str(), rsc2->location().c_str()) < 0;
103  }
104  };
105 
106 
107 } // local namespace
108 
123 int main(int argc, char **argv) {
124  int exit_status {};
125  InetAddr listen_addr;
126  ContextPtr context;
127 
128  // Register ourselves as the Comm-layer proxy master
130 
131  while (true) {
132 
133  try {
134  init_with_policies<Policies>(argc, argv);
135  uint16_t port = get_i16("port");
136  listen_addr = InetAddr(INADDR_ANY, port);
137 
138  Hyperspace::SessionPtr hyperspace = make_shared<Hyperspace::Session>(Comm::instance(), properties);
139  context = make_shared<Context>(properties, hyperspace);
140  context->monitoring = make_shared<Monitoring>(context.get());
141  context->op = make_unique<OperationProcessor>(context, get_i32("workers"));
142 
143  ConnectionHandlerFactoryPtr connection_handler_factory(new HandlerFactory(context));
144  context->comm->listen(listen_addr, connection_handler_factory);
145 
146  auto func = [&context](bool status){context->set_startup_status(status);};
147  if (!context->master_file->obtain_master_lock(context->toplevel_dir, func)){
148  context->start_shutdown();
149  context->op->join();
150  break;
151  }
152 
153  // Load (and possibly generate) the cluster ID
154  ClusterId cluster_id(context->hyperspace, ClusterId::GENERATE_IF_NOT_FOUND);
155  HT_INFOF("Cluster id is %llu", (Llu)ClusterId::get());
156 
157  context->mml_definition =
158  make_shared<MetaLog::DefinitionMaster>(context, format("%s_%u", "master", port).c_str());
159 
160  if (has("induce-failure")) {
161  if (FailureInducer::instance == 0)
163  FailureInducer::instance->parse_option(get_str("induce-failure"));
164  }
165 
169  std::vector<MetaLog::EntityPtr> entities;
170  std::vector<OperationPtr> operations;
171  std::map<String, OperationPtr> recovery_ops;
172  MetaLog::ReaderPtr mml_reader;
173  OperationPtr operation;
175  String log_dir = context->toplevel_dir + "/servers/master/log/"
176  + context->mml_definition->name();
177  BalancePlanAuthority *bpa {};
178 
179  mml_reader = make_shared<MetaLog::Reader>(context->dfs, context->mml_definition,
180  log_dir);
181  mml_reader->get_entities(entities);
182 
183  // Uniq-ify the RangeServerConnection and BalancePlanAuthority objects
184  {
185  std::vector<MetaLog::EntityPtr> entities2;
186  std::set<RangeServerConnectionPtr, ltrsc> rsc_set;
187 
188  entities2.reserve(entities.size());
189  for (auto &entity : entities) {
190  if (dynamic_cast<RangeServerConnection *>(entity.get())) {
191  RangeServerConnectionPtr rsc {dynamic_pointer_cast<RangeServerConnection>(entity)};
192  if (rsc_set.count(rsc) > 0)
193  rsc_set.erase(rsc);
194  rsc_set.insert(rsc);
195  }
196  else {
197  if (dynamic_cast<BalancePlanAuthority *>(entity.get())) {
198  bpa = dynamic_cast<BalancePlanAuthority *>(entity.get());
199  context->set_balance_plan_authority(entity);
200  }
201  else if (dynamic_cast<SystemState *>(entity.get()))
202  context->system_state = dynamic_pointer_cast<SystemState>(entity);
203  else if (dynamic_cast<RecoveredServers *>(entity.get()))
204  context->recovered_servers = dynamic_pointer_cast<RecoveredServers>(entity);
205  entities2.push_back(entity);
206  }
207  }
208  // Insert uniq'ed RangeServerConnections
209  for (const auto &rsc : rsc_set)
210  entities2.push_back(rsc);
211  entities.swap(entities2);
212  }
213 
214  if (!context->system_state)
215  context->system_state = make_shared<SystemState>();
216 
217  if (!context->recovered_servers)
218  context->recovered_servers = make_shared<RecoveredServers>();
219 
220  context->mml_writer =
221  make_shared<MetaLog::Writer>(context->dfs, context->mml_definition,
222  log_dir, entities);
223 
224  if (bpa) {
225  bpa->set_mml_writer(context->mml_writer);
226  std::stringstream sout;
227  sout << "Loading BalancePlanAuthority: " << *bpa;
228  HT_INFOF("%s", sout.str().c_str());
229  }
230 
231  context->response_manager->set_mml_writer(context->mml_writer);
232 
233  // First do System Upgrade
234  operation = make_shared<OperationSystemUpgrade>(context);
235  context->op->add_operation(operation);
236  context->op->wait_for_empty();
237 
238  // Then reconstruct state and start execution
239  for (auto &entity : entities) {
240  Operation *op = dynamic_cast<Operation *>(entity.get());
241  if (op) {
242  operation = dynamic_pointer_cast<Operation>(entity);
243  if (op->get_remove_approval_mask() && !op->removal_approved())
244  context->reference_manager->add(operation);
245 
246  if (dynamic_cast<OperationMoveRange *>(op))
247  context->add_move_operation(operation);
248 
249  // master was interrupted in the middle of rangeserver failover
250  if (dynamic_cast<OperationRecover *>(op)) {
251  HT_INFO("Recovery was interrupted; continuing");
252  OperationRecover *recover_op = dynamic_cast<OperationRecover *>(op);
253  if (!recover_op->location().empty()) {
254  operations.push_back(operation);
255  recovery_ops[recover_op->location()] = operation;
256  }
257  }
258  else
259  operations.push_back(operation);
260  }
261  else if (dynamic_cast<RangeServerConnection *>(entity.get())) {
262  rsc = dynamic_pointer_cast<RangeServerConnection>(entity);
263  context->rsc_manager->add_server(rsc);
264  }
265  }
266  context->balancer = new LoadBalancer(context);
267 
268  // For each RangeServerConnection that doesn't already have an
269  // outstanding OperationRecover, create and add one
270  for (auto &entity : entities) {
271  if (dynamic_cast<RangeServerConnection *>(entity.get())) {
272  rsc = dynamic_pointer_cast<RangeServerConnection>(entity);
273  if (recovery_ops.find(rsc->location()) == recovery_ops.end())
274  operations.push_back(make_shared<OperationRecover>(context, rsc, OperationRecover::RESTART));
275  }
276  }
277  recovery_ops.clear();
278 
279  if (operations.empty()) {
280  OperationPtr init_op = make_shared<OperationInitialize>(context);
281  if (context->namemap->exists_mapping("/sys/METADATA", 0))
282  init_op->set_state(OperationState::CREATE_RS_METRICS);
283  operations.push_back(init_op);
284  }
285  else {
286  if (!context->metadata_table)
287  context->metadata_table = context->new_table(TableIdentifier::METADATA_NAME);
288 
289  if (!context->rs_metrics_table)
290  context->rs_metrics_table = context->new_table("sys/RS_METRICS");
291  }
292 
293  // Add PERPETUAL operations
294  operation = make_shared<OperationWaitForServers>(context);
295  operations.push_back(operation);
296  context->recovery_barrier_op =
297  make_shared<OperationTimedBarrier>(context, Dependency::RECOVERY, Dependency::RECOVERY_BLOCKER);
298  operation = make_shared<OperationRecoveryBlocker>(context);
299  operations.push_back(operation);
300 
301  context->op->add_operations(operations);
302 
303  context->master_file->write_master_address();
304 
306  static_cast<HandlerFactory*>(connection_handler_factory.get())->start_timer();
307 
308  if (!context->set_startup_status(false))
309  context->start_shutdown();
310 
311  context->op->join();
312  }
313  catch (Exception &e) {
314  HT_ERROR_OUT << e << HT_END;
315  exit_status = 1;
316  }
317  break;
318  }
319 
320  context->comm->close_socket(listen_addr);
321  context.reset();
322 
323  if (has("pidfile"))
324  FileUtils::unlink(get_str("pidfile"));
325 
326  Comm::destroy();
327 
328  return exit_status;
329 }
330 
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
Retrieves system information (hardware, installation directory, etc)
Interface and base of config policy.
Definition: Config.h:149
Cons< DefaultPolicy, CommPolicy > DefaultCommPolicy
Default comm layer config policy.
Definition: Config.h:84
Declarations for MetaLog::Reader.
Meta::list< AppPolicy, FsBrokerPolicy, DefaultCommPolicy > Policies
Definition: main.cc:69
The FailureInducer simulates errors.
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
static bool unlink(const String &fname)
Unlinks (deletes) a file or directory.
Definition: FileUtils.cc:427
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
Declarations for OperationProcessor.
Abstract class for creating default application dispatch handlers.
static const char * METADATA_NAME
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
static FailureInducer * instance
This is a singleton class.
Declarations for ConnectionHandler.
Carries out recovery operaton for a range server.
static void destroy()
Destroys singleton instance of the Comm class.
Definition: Comm.cc:102
uint16_t get_remove_approval_mask()
Gets the remove approvals bit mask.
Definition: Operation.h:353
int main(int argc, char **argv)
Definition: main.cc:71
Declarations for ReferenceManager.
Provides access to the cluster ID.
Definition: ClusterId.h:45
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
Declarations for SystemState.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
Encapsulate an internet address.
Definition: InetAddr.h:66
Holds persistent global system state.
Definition: SystemState.h:45
static uint64_t get()
Gets the cluster ID.
Definition: ClusterId.h:85
Compatibility Macros for C/C++.
bool removal_approved()
Checks if all remove approvals have been received.
Definition: Operation.cc:362
void parse_option(String spec)
Parses a spec string (as explained above) and stores it in an internal structure. ...
Set of recovered servers.
Config policy for generic Comm layer server.
Definition: Config.h:78
Initialization helper for applications.
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
const char * RECOVERY_BLOCKER
Definition: Operation.cc:52
Hypertable definitions
std::shared_ptr< Reader > ReaderPtr
Smart pointer to Reader.
Central authority for balance plans.
Declarations for Comm.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
const String & location() const
Abstract base class for master operations.
Definition: Operation.h:124
const char * RECOVERY
Definition: Operation.cc:53
This is a generic exception class for Hypertable.
Definition: Error.h:314
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Handles incoming Master requests.
Passed to constructor to tell it to generate cluster ID if not found.
Definition: ClusterId.h:52
Declarations for BalancePlanAuthority.
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
Definition: Config.cc:607
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
Declarations for Context.
Declarations for ClusterId.
Declarations for ResponseManager.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".