0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationRegisterServer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 
30 
34 
35 #include <Hyperspace/Session.h>
36 
37 #include <Common/Error.h>
38 #include <Common/FailureInducer.h>
39 #include <Common/Serialization.h>
40 #include <Common/StringExt.h>
41 #include <Common/Time.h>
42 #include <Common/md5.h>
43 
44 #include <boost/algorithm/string.hpp>
45 
46 #include <cmath>
47 
48 using namespace Hypertable;
49 using namespace std;
50 
52  EventPtr &event)
53  : OperationEphemeral(context, event,
54  MetaLog::EntityType::OPERATION_REGISTER_SERVER) {
55  const uint8_t *ptr = event->payload;
56  size_t remaining = event->payload_len;
57  m_params.decode(&ptr, &remaining);
58 
60 
61  if (!m_location.empty()) {
62  add_exclusivity(String("RegisterServer ") + m_location);
63  add_dependency(String("RegisterServerBlocker ") + m_location);
64  }
65 
66  m_local_addr = InetAddr(event->addr);
70 }
71 
72 
74  std::vector<SystemVariable::Spec> specs;
75  uint64_t generation {};
76  uint64_t handle {};
77  RangeServerHyperspaceCallback *hyperspace_callback {};
78  bool newly_created {};
79 
80  if (!m_location.empty()) {
81 
82  // This shouldn't be necessary, however we've seen Hyperspace declare a
83  // server to be dead when it was in fact still alive (see issue 1346).
84  // We'll leave this in place until Hyperspace gets overhauled.
85  if (m_context->recovered_servers->contains(m_location)) {
88  HT_WARNF("Attempt to register server %s that has been removed",
89  m_location.c_str());
90  complete_ok();
91  return;
92  }
93 
94  m_context->rsc_manager->find_server_by_location(m_location, m_rsc);
95 
96  }
97  else {
98  uint64_t id = m_context->master_file->next_server_id();
99  if (m_context->location_hash.empty())
100  m_location = format("rs%llu", (Llu)id);
101  else
102  m_location = format("rs-%s-%llu", m_context->location_hash.c_str(),
103  (Llu)id);
104  newly_created = true;
105 
106  String fname = m_context->toplevel_dir + "/servers/";
107  // !!! wrap in try/catch
108  m_context->hyperspace->mkdirs(fname);
109  fname += m_location;
111  // !!! wrap in try/catch
112  handle = m_context->hyperspace->open(fname, oflags);
113  m_context->hyperspace->close(handle);
114  }
115 
116  if (!m_rsc)
117  m_rsc = make_shared<RangeServerConnection>(m_location, m_params.system_stats().net_info.host_name, m_public_addr);
118  else
119  m_context->rsc_manager->disconnect_server(m_rsc);
120 
121  if (!m_rsc->get_hyperspace_handle(&handle, &hyperspace_callback)) {
122  String fname = m_context->toplevel_dir + "/servers/" + m_location;
123  hyperspace_callback
125  Hyperspace::HandleCallbackPtr cb(hyperspace_callback);
126  handle = m_context->hyperspace->open(fname, Hyperspace::OPEN_FLAG_READ|
128  HT_ASSERT(handle);
129  m_rsc->set_hyperspace_handle(handle, hyperspace_callback);
130  }
131 
132  if (m_context->rsc_manager->is_connected(m_location)) {
134  format("%s already connected", m_location.c_str()));
136  header.initialize_from_request_header(m_event->header);
138  m_context->system_state->get(specs, &generation);
139  CommBufPtr cbp(new CommBuf(header, encoded_response_length(generation,
140  specs)));
141  encode_response(generation, specs, cbp->get_data_ptr_address());
142  int error = m_context->comm->send_response(m_local_addr, cbp);
143  if (error != Error::OK)
144  HT_ERRORF("Problem sending response (location=%s) back to %s",
145  m_location.c_str(), m_local_addr.format().c_str());
146  return;
147  }
148 
149  int32_t difference = (int32_t)abs((m_received_ts - m_params.now()) / 1000LL);
150  if (difference > (3000000 + m_context->max_allowable_skew)) {
151  String errstr = format("Detected clock skew while registering server "
152  "%s (%s), as location %s register_ts=%llu, received_ts=%llu, "
153  "difference=%d > allowable skew %d",
155  m_public_addr.format().c_str(), m_location.c_str(),
156  (Llu)m_params.now(), (Llu)m_received_ts, difference,
157  m_context->max_allowable_skew);
158  String subject = format("Clock skew detected while registering %s (%s)",
160  m_location.c_str());
161  m_context->notification_hook(subject, errstr);
163  HT_ERROR_OUT << errstr << HT_END;
164  // clock skew detected by master
166  header.initialize_from_request_header(m_event->header);
168  m_context->system_state->get(specs, &generation);
169  CommBufPtr cbp(new CommBuf(header, encoded_response_length(generation,
170  specs)));
171 
172  encode_response(generation, specs, cbp->get_data_ptr_address());
173  int error = m_context->comm->send_response(m_local_addr, cbp);
174  if (error != Error::OK)
175  HT_ERRORF("Problem sending response (location=%s) back to %s",
176  m_location.c_str(), m_local_addr.format().c_str());
177  m_context->op->unblock(m_location);
178  m_context->op->unblock(Dependency::SERVERS);
180  HT_INFOF("%lld Leaving RegisterServer %s",
181  (Lld)header.id, m_rsc->location().c_str());
182  return;
183  }
184  else {
185  m_context->monitoring->add_server(m_location, m_params.system_stats());
186  String message = format("Registering server %s\nhost = %s\n"
187  "public addr = %s\nlocal addr = %s\n",
188  m_rsc->location().c_str(),
190  m_public_addr.format().c_str(),
191  m_local_addr.format().c_str());
192 
193  if (newly_created) {
194  String subject
195  = format("Registering server %s - %s (%s)", m_location.c_str(),
197  m_public_addr.format().c_str());
198  m_context->notification_hook(subject, message);
199  }
200  boost::replace_all(message, "\n", " ");
201  HT_INFOF("%lld %s", (Lld)header.id, message.c_str());
202 
204  {
206  header.initialize_from_request_header(m_event->header);
208  m_context->system_state->get(specs, &generation);
209  CommBufPtr cbp(new CommBuf(header, encoded_response_length(generation,
210  specs)));
211  encode_response(generation, specs, cbp->get_data_ptr_address());
212  int error = m_context->comm->send_response(m_local_addr, cbp);
213  if (error != Error::OK)
214  HT_ERRORF("Problem sending response (location=%s) back to %s",
215  m_location.c_str(), m_local_addr.format().c_str());
216  }
217  }
218 
219  // Wait for server to acquire lock on Hyperspace file
220  if (!m_params.lock_held()) {
221  if (!hyperspace_callback->wait_for_lock_acquisition(chrono::seconds(120))) {
222  String notification_body = format("Timed out waiting for %s to acquire "
223  "lock on Hyperspace file",
224  m_location.c_str());
225  HT_WARNF("%s, sending shutdown request...", notification_body.c_str());
226 
229  m_context->rsc_manager->disconnect_server(m_rsc);
230  String notification_subject =
231  format("Server registration error for %s (%s)", m_location.c_str(),
232  m_rsc->hostname().c_str());
233  m_context->notification_hook(notification_subject, notification_body);
234  OperationPtr operation = make_shared<OperationRecover>(m_context, m_rsc);
235  try {
236  m_context->op->add_operation(operation);
237  }
238  catch (Exception &e) {
239  // Only exception thrown is Error::MASTER_OPERATION_IN_PROGRESS
240  }
241  complete_ok();
242  return;
243  }
244  }
245 
246  // At this point, any pending Recover operations should be irrelevant
247  OperationPtr operation(m_context->op->remove_operation(md5_hash("OperationRecover") ^ md5_hash(m_location.c_str())));
248  if (operation)
249  operation->complete_ok();
250 
251  string hostname(m_params.system_stats().net_info.host_name);
252  m_context->comm->set_alias(m_local_addr, m_public_addr);
253  m_context->comm->add_proxy(m_rsc->location(), hostname, m_public_addr);
254  m_context->rsc_manager->connect_server(m_rsc, hostname, m_local_addr, m_public_addr);
255 
256  m_context->add_available_server(m_location);
257 
258  // Prevent subsequent registration until lock is released
259  operation = make_shared<OperationRegisterServerBlocker>(m_context,
260  m_rsc->location());
261  m_context->op->add_operation(operation);
262 
263  if (!m_rsc->get_balanced())
264  m_context->balancer->signal_new_server();
265 
266  m_context->mml_writer->record_state(m_rsc);
267 
268  complete_ok();
269  m_context->op->unblock(m_location);
270  m_context->op->unblock(Dependency::SERVERS);
271  m_context->op->unblock(Dependency::RECOVERY_BLOCKER);
272  HT_INFOF("%lld Leaving RegisterServer %s",
273  (Lld)header.id, m_rsc->location().c_str());
274 }
275 
277  os << " location=" << m_location << " host="
279  os << " register_ts=" << m_params.now();
280  os << " local_addr=" << m_rsc->local_addr().format();
281  os << " public_addr=" << m_rsc->public_addr().format() << " ";
282 }
283 
285  return "OperationRegisterServer";
286 }
287 
289  return String("RegisterServer ") + m_location;
290 }
291 
293  std::vector<SystemVariable::Spec> &specs) const {
294  size_t length = 4;
295  if (m_error == Error::OK) {
297  length += params.encoded_length();
298  }
299  else
301  return length;
302 }
303 
305  std::vector<SystemVariable::Spec> &specs,
306  uint8_t **bufp) const {
308  if (m_error == Error::OK) {
310  params.encode(bufp);
311  }
312  else
314 }
int64_t id()
Operation identifier.
Definition: Operation.h:326
void initialize_from_request_header(CommHeader &req_header)
Initializes header from req_header.
Definition: CommHeader.h:128
#define HT_WARNF(msg,...)
Definition: Logger.h:290
The FailureInducer simulates errors.
ContextPtr m_context
Pointer to Master context.
Definition: Operation.h:553
int64_t md5_hash(const char *input)
Returns a 64-bit hash checksum of a null terminated input buffer.
Definition: md5.cc:388
Abstract base class for ephemeral operations.
String m_error_msg
Result error message.
Definition: Operation.h:574
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
struct NetInfo net_info
Network information (host name, primary interface, gateway etc)
Definition: StatsSystem.h:112
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
const char * SERVERS
Definition: Operation.cc:46
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
int64_t id
Unique ID of entity.
Declarations for OperationProcessor.
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
uint32_t id
Request ID.
Definition: CommHeader.h:141
void add_exclusivity(const String &exclusivity)
Definition: Operation.h:461
void execute() override
Executes (carries out) the operation.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
virtual void encode_response(uint64_t generation, std::vector< SystemVariable::Spec > &specs, uint8_t **bufp) const
STL namespace.
OperationRegisterServer(ContextPtr &context, EventPtr &event)
EntityHeader header
Entity header
Declarations for RangeServerClient.
virtual size_t encoded_response_length(uint64_t generation, std::vector< SystemVariable::Spec > &specs) const
EventPtr m_event
Pointer to client event (if any) that originated the operation.
Definition: Operation.h:556
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
#define HT_ASSERT(_e_)
Definition: Logger.h:396
const String label() override
Human readable label for operation.
Callback class for range server lock files.
Declarations for RangeServerProtocol.
const StatsSystem & system_stats() const
Gets system stats.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
size_t encoded_length_str16(const char *str)
Computes the encoded length of a string16 encoding.
Encapsulate an internet address.
Definition: InetAddr.h:66
int32_t m_error
Result error code.
Definition: Operation.h:571
void add_dependency(const String &dependency)
Definition: Operation.h:462
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
void display_state(std::ostream &os) override
Write human readable operation state to output stream.
Compatibility Macros for C/C++.
Lib::Master::Request::Parameters::RegisterServer m_params
Request parmaeters.
Declarations for RangeServerHyperspaceCallback.
#define HT_END
Definition: Logger.h:220
Functions to serialize/deserialize primitives to/from a memory buffer.
#define HT_ERROR_OUT
Definition: Logger.h:301
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
std::shared_ptr< HandleCallback > HandleCallbackPtr
const char * RECOVERY_BLOCKER
Definition: Operation.cc:52
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Declarations for general-purpose utility functions.
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
#define HT_INFOF(msg,...)
Definition: Logger.h:272
const string & location() const
Gets location (proxy name)
This is a generic exception class for Hypertable.
Definition: Error.h:314
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
Response parameters for register server operation.
void encode_str16(uint8_t **bufp, const void *str, uint16_t len)
Encodes a string buffer into the given buffer.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Create file if it does not exist.
Definition: Session.h:77
void shutdown_rangeserver(ContextPtr &context, CommAddress &addr)
Sends a shutdown command to a rangeserver.
Definition: Utility.cc:447
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Definition: Operation.cc:436
Open file for reading.
Definition: Session.h:71
Declarations for RegisterServer response parameters.
const String name() override
Name of operation used for exclusivity.
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
void complete_error(int error, const String &msg, std::vector< MetaLog::EntityPtr > &additional)
Completes operation with error.
Definition: Operation.cc:400
md5 digest routines.
uint64_t command
Request command number.
Definition: CommHeader.h:146
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40