0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ConnectionHandler.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "ConnectionHandler.h"
31 #include "LoadBalancer.h"
32 #include "OperationAlterTable.h"
33 #include "OperationBalance.h"
35 #include "OperationCompact.h"
37 #include "OperationCreateTable.h"
38 #include "OperationDropNamespace.h"
39 #include "OperationDropTable.h"
41 #include "OperationMoveRange.h"
42 #include "OperationProcessor.h"
43 #include "OperationRecover.h"
47 #include "OperationRenameTable.h"
48 #include "OperationSetState.h"
49 #include "OperationStatus.h"
50 #include "OperationStop.h"
51 #include "OperationSystemStatus.h"
52 #include "OperationTimedBarrier.h"
53 #include "RangeServerConnection.h"
54 #include "ReferenceManager.h"
55 
58 
60 
61 #include <Common/Config.h>
62 #include <Common/Error.h>
63 #include <Common/FailureInducer.h>
64 #include <Common/StringExt.h>
65 #include <Common/Serialization.h>
66 #include <Common/Time.h>
67 
68 #include <fstream>
69 #include <iostream>
70 
71 using namespace Hypertable;
72 using namespace Hypertable::Lib;
73 using namespace Serialization;
74 using namespace Error;
75 using namespace std;
76 
77 
79  int error;
80  if ((error = m_context->comm->set_timer(m_context->timer_interval, shared_from_this())) != Error::OK)
81  HT_FATALF("Problem setting timer - %s", Error::get_text(error));
82 }
83 
85  OperationPtr operation;
86 
87  if (event->type == Event::MESSAGE) {
88 
89  //event->display();
90 
91  if (event->header.command != Lib::Master::Protocol::COMMAND_STATUS &&
92  event->header.command != Lib::Master::Protocol::COMMAND_SHUTDOWN) {
93  if (m_context->shutdown_in_progress()) {
94  ResponseCallback cb(m_context->comm, event);
96  return;
97  }
98  else if (m_context->startup_in_progress() ||
99  !m_context->master_file->lock_acquired()) {
100  ResponseCallback cb(m_context->comm, event);
102  return;
103  }
104  }
105 
106  try {
107  // sanity check command code
108  if (event->header.command >= Lib::Master::Protocol::COMMAND_MAX)
109  HT_THROWF(PROTOCOL_ERROR, "Invalid command (%llu)",
110  (Llu)event->header.command);
111 
112  switch (event->header.command) {
114  operation = make_shared<OperationStatus>(m_context, event);
115  m_context->response_manager->add_delivery_info(operation->id(), event);
116  m_context->op->add_operation(operation);
117  return;
118  case Lib::Master::Protocol::COMMAND_SYSTEM_STATUS:
119  operation = make_shared<OperationSystemStatus>(m_context, event);
120  m_context->response_manager->add_delivery_info(operation->id(), event);
121  m_context->op->add_operation(operation);
122  return;
124  operation = make_shared<OperationCompact>(m_context, event);
125  break;
127  operation = make_shared<OperationCreateTable>(m_context, event);
128  break;
130  operation = make_shared<OperationDropTable>(m_context, event);
131  break;
133  operation = make_shared<OperationAlterTable>(m_context, event);
134  break;
136  operation = make_shared<OperationRenameTable>(m_context, event);
137  break;
138  case Lib::Master::Protocol::COMMAND_REGISTER_SERVER:
139  operation = make_shared<OperationRegisterServer>(m_context, event);
140  m_context->op->add_operation(operation);
141  return;
142  case Lib::Master::Protocol::COMMAND_MOVE_RANGE:
143  operation = make_shared<OperationMoveRange>(m_context, event);
144  if (!m_context->add_move_operation(operation)) {
145  HT_INFOF("Skipping %s because already in progress",
146  operation->label().c_str());
147  send_error_response(event, Error::MASTER_OPERATION_IN_PROGRESS, "");
148  return;
149  }
150  // Add to reference manager
151  m_context->reference_manager->add(operation);
152  HT_MAYBE_FAIL("connection-handler-move-range");
153  m_context->op->add_operation(operation);
154  return;
155  case Lib::Master::Protocol::COMMAND_RELINQUISH_ACKNOWLEDGE:
156  operation = make_shared<OperationRelinquishAcknowledge>(m_context, event);
157  break;
159  operation = make_shared<OperationBalance>(m_context, event);
160  break;
161  case Lib::Master::Protocol::COMMAND_SET_STATE:
162  operation = make_shared<OperationSetState>(m_context, event);
163  break;
165  operation = make_shared<OperationStop>(m_context, event);
166  break;
168  HT_INFO("Received shutdown command");
169  m_context->start_shutdown();
170  send_ok_response(event, true);
171  return;
173  operation = make_shared<OperationCreateNamespace>(m_context, event);
174  break;
176  operation = make_shared<OperationDropNamespace>(m_context, event);
177  break;
178  case Lib::Master::Protocol::COMMAND_RECREATE_INDEX_TABLES:
179  operation = make_shared<OperationRecreateIndexTables>(m_context, event);
180  break;
181 
182  case Lib::Master::Protocol::COMMAND_FETCH_RESULT:
183  {
184  const uint8_t *ptr = event->payload;
185  size_t remain = event->payload_len;
187  params.decode(&ptr, &remain);
188  m_context->response_manager->add_delivery_info(params.get_id(), event);
189  }
190  return;
191  case Lib::Master::Protocol::COMMAND_REPLAY_STATUS:
192  m_context->replay_status(event);
193  send_ok_response(event);
194  return;
195  case Lib::Master::Protocol::COMMAND_REPLAY_COMPLETE:
196  m_context->replay_complete(event);
197  send_ok_response(event);
198  return;
199  case Lib::Master::Protocol::COMMAND_PHANTOM_PREPARE_COMPLETE:
200  m_context->prepare_complete(event);
201  send_ok_response(event);
202  return;
203  case Lib::Master::Protocol::COMMAND_PHANTOM_COMMIT_COMPLETE:
204  m_context->commit_complete(event);
205  send_ok_response(event);
206  return;
207  default:
208  HT_THROWF(PROTOCOL_ERROR, "Unimplemented command (%llu)",
209  (Llu)event->header.command);
210  }
211  if (operation) {
212  HT_MAYBE_FAIL_X("connection-handler-before-id-response",
213  event->header.command != Lib::Master::Protocol::COMMAND_STATUS &&
214  event->header.command != Lib::Master::Protocol::COMMAND_RELINQUISH_ACKNOWLEDGE);
215  if (send_id_response(event, operation) != Error::OK)
216  return;
217  m_context->op->add_operation(operation);
218  }
219  else {
220  ResponseCallback cb(m_context->comm, event);
222  format("Unimplemented command (%llu)",
223  (Llu)event->header.command));
224  }
225  }
226  catch (Exception &e) {
228  HT_WARNF("%s", e.what());
229  else
230  HT_ERROR_OUT << e << HT_END;
231  if (operation) {
232  operation->set_ephemeral();
233  operation->complete_error(e.code(), e.what());
234  m_context->response_manager->add_operation(operation);
235  }
236  }
237  }
238  else if (event->type == Hypertable::Event::TIMER) {
239  OperationPtr operation;
240  int error;
241  time_t now = time(0);
242 
243  if (m_context->shutdown_in_progress())
244  return;
245 
246  try {
247 
248  maybe_dump_op_statistics();
249 
250  if (m_context->hyperspace->get_state() == Hyperspace::Session::STATE_SAFE) {
251  if (m_context->next_monitoring_time <= now) {
252  operation = make_shared<OperationGatherStatistics>(m_context);
253  m_context->op->add_operation(operation);
254  m_context->next_monitoring_time = now + (m_context->monitoring_interval/1000) - 1;
255  }
256 
257  if (m_context->next_gc_time <= now) {
258  operation = make_shared<OperationCollectGarbage>(m_context);
259  m_context->op->add_operation(operation);
260  m_context->next_gc_time = now + (m_context->gc_interval/1000) - 1;
261  }
262 
263  if (m_context->balancer->balance_needed()) {
264  operation = make_shared<OperationBalance>(m_context);
265  m_context->op->add_operation(operation);
266  }
267  }
268  }
269  catch (Exception &e) {
271  HT_WARNF("%s", e.what());
272  else
273  HT_ERROR_OUT << e << HT_END;
274  }
275 
276  if ((error = m_context->comm->set_timer(m_context->timer_interval, shared_from_this())) != Error::OK)
277  HT_FATALF("Problem setting timer - %s", Error::get_text(error));
278 
279  }
280  else {
281  HT_INFOF("%s", event->to_str().c_str());
282  }
283 
284 }
285 
287  int error = Error::OK;
288  CommHeader header;
289  header.initialize_from_request_header(event->header);
290  CommBufPtr cbp(new CommBuf(header, 12));
291  cbp->append_i32(Error::OK);
292  cbp->append_i64(operation->id());
293  error = m_context->comm->send_response(event->addr, cbp);
294  if (error != Error::OK)
295  HT_ERRORF("Problem sending ID response back for %s operation (id=%lld) - %s",
296  operation->label().c_str(), (Lld)operation->id(), Error::get_text(error));
297  return error;
298 }
299 
300 int32_t ConnectionHandler::send_ok_response(EventPtr &event, bool silent) {
301  CommHeader header;
302  header.initialize_from_request_header(event->header);
303  CommBufPtr cbp(new CommBuf(header, 4));
304  cbp->append_i32(Error::OK);
305  int ret = m_context->comm->send_response(event->addr, cbp);
306  if (!silent && ret != Error::OK)
307  HT_ERRORF("Problem sending error response back to %s - %s",
308  event->addr.format().c_str(), Error::get_text(ret));
309  return ret;
310 }
311 
312 int32_t ConnectionHandler::send_error_response(EventPtr &event, int32_t error, const String &msg) {
313  CommHeader header;
314  header.initialize_from_request_header(event->header);
315  CommBufPtr cbp(new CommBuf(header, 4 + Serialization::encoded_length_vstr(msg)));
316  cbp->append_i32(error);
317  cbp->append_vstr(msg);
318  int ret = m_context->comm->send_response(event->addr, cbp);
319  if (ret != Error::OK)
320  HT_ERRORF("Problem sending error response back to %s - %s",
321  event->addr.format().c_str(), Error::get_text(error));
322  return error;
323 }
324 
326 
327  if (FileUtils::exists(System::install_dir + "/run/debug-op")) {
328  String description;
329  String output_fname = System::install_dir + "/run/op.output";
330  ofstream out;
331  out.open(output_fname.c_str());
332  m_context->op->state_description(description);
333  out << description;
334  out.close();
335  FileUtils::unlink(System::install_dir + "/run/debug-op");
336  }
337 
338 }
Declarations for FetchResult request parameters.
void initialize_from_request_header(CommHeader &req_header)
Initializes header from req_header.
Definition: CommHeader.h:128
#define HT_WARNF(msg,...)
Definition: Logger.h:290
Request parameters for fetch result request.
Definition: FetchResult.h:46
The FailureInducer simulates errors.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Declarations for OperationDropTable.
static bool unlink(const String &fname)
Unlinks (deletes) a file or directory.
Definition: FileUtils.cc:427
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
Declarations for OperationProcessor.
Declarations for Protocol.
Declarations for OperationCreateTable.
int32_t send_error_response(EventPtr &event, int32_t error, const String &msg)
Sends error response message back to client.
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
Declarations for OperationRecreateIndexTables.
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
Declarations for ConnectionHandler.
Declarations for ReferenceManager.
Declarations for OperationCompact.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
Functions to serialize/deserialize primitives to/from a memory buffer.
#define HT_ERROR_OUT
Definition: Logger.h:301
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
Hypertable library.
Definition: CellInterval.h:30
This class is used to generate and deliver standard responses back to a client.
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Declarations for OperationSetState.
Declarations for OperationGatherStatistics.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Request/response message event.
Definition: Event.h:63
Timer event
Definition: Event.h:65
static String install_dir
The installation directory.
Definition: System.h:114
This is a generic exception class for Hypertable.
Definition: Error.h:314
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
#define HT_MAYBE_FAIL_X(_label_, _exp_)
virtual void handle(EventPtr &event)
Responds to Master request events.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
int32_t send_id_response(EventPtr &event, OperationPtr &operation)
Sends operation ID back to client.
int32_t send_ok_response(EventPtr &event, bool silent=false)
Sends OK response message back to client.
Configuration settings.
#define HT_MAYBE_FAIL(_label_)
Declarations for OperationAlterTable.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
Declarations for ResponseCallback.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
void maybe_dump_op_statistics()
Maybe dumps OperationProcessor statistics.
int code() const
Returns the error code.
Definition: Error.h:391