0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ClientKeepaliveHandler.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "ClientKeepaliveHandler.h"
25 #include "Master.h"
26 #include "Protocol.h"
27 #include "Session.h"
28 
29 #include <Common/Error.h>
30 #include <Common/InetAddr.h>
31 #include <Common/StringExt.h>
32 #include <Common/Time.h>
33 
34 #include <chrono>
35 #include <thread>
36 
37 using namespace std;
38 using namespace Hypertable;
39 using namespace Hyperspace;
40 using namespace Serialization;
41 
42 ClientKeepaliveHandler::ClientKeepaliveHandler(Comm *comm, PropertiesPtr &cfg,
43  Session *session)
44  : m_comm(comm), m_session(session) {
45 
46  HT_TRY("getting config values",
47  m_verbose = cfg->get_bool("Hypertable.Verbose");
48  m_hyperspace_port = cfg->get_i16("Hyperspace.Replica.Port");
49  m_datagram_send_port = cfg->get_i16("Hyperspace.Client.Datagram.SendPort");
50  m_lease_interval = cfg->get_i32("Hyperspace.Lease.Interval");
51  m_keep_alive_interval = cfg->get_i32("Hyperspace.KeepAlive.Interval");
52  m_reconnect = cfg->get_bool("Hyperspace.Session.Reconnect"));
53 
54  auto now = chrono::steady_clock::now();
56  m_jeopardy_time = now + chrono::milliseconds(m_lease_interval);
57 
58  for (const auto &replica : cfg->get_strs("Hyperspace.Replica.Host")) {
59  m_hyperspace_replicas.push_back(replica);
60  }
61 
62  HT_DEBUG_OUT << "Looking for Hyperspace master at " << m_hyperspace_replicas[0]
63  << ":" << m_hyperspace_port << HT_END;
66 
68 
69 }
70 
71 
73  int error;
74 
76 
77  m_comm->create_datagram_receive_socket(m_local_addr, 0x10, shared_from_this());
78 
79  CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(m_session_id,
81 
82  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp)
83  != Error::OK)) {
84  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
85  exit(EXIT_FAILURE);
86  }
87 
88  if ((error = m_comm->set_timer(m_keep_alive_interval, shared_from_this()))
89  != Error::OK) {
90  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
91  exit(EXIT_FAILURE);
92  }
93 
94 }
95 
96 
98  lock_guard<recursive_mutex> lock(m_mutex);
99  int error;
100 
101  if (m_dead)
102  return;
103  else if (m_destroying) {
104  destroy();
105  m_cond_destroyed.notify_all();
106  return;
107  }
108 
109  /*
110  if (m_verbose) {
111  HT_INFOF("%s", event->to_str().c_str());
112  }
113  **/
114 
115  if (event->type == Hypertable::Event::MESSAGE) {
116  const uint8_t *decode_ptr = event->payload;
117  size_t decode_remain = event->payload_len;
118 
119  try {
120 
121  // sanity check command code
122  if (event->header.command >= Protocol::COMMAND_MAX)
123  HT_THROWF(Error::PROTOCOL_ERROR, "Invalid command (%llu)",
124  (Llu)event->header.command);
125 
126  switch (event->header.command) {
127  case Protocol::COMMAND_REDIRECT:
128  {
129  const char *host;
130  host = decode_vstr(&decode_ptr, &decode_remain);
131 
132  // if we know who the new master is then send a keepalive to it
133  // else send a keepalive to whichever replica we sent it to last time
134  if (strlen(host) != 0) {
135  HT_DEBUG_OUT << "Received COMMAND_REDIRECT looking for master at "
136  << host << HT_END;
140  }
141 
142  CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(m_session_id,
144 
145  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp) != Error::OK)) {
146  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
147  exit(EXIT_FAILURE);
148  }
149 
150  if ((error = m_comm->set_timer(m_keep_alive_interval, shared_from_this())) != Error::OK) {
151  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
152  exit(EXIT_FAILURE);
153  }
154  break;
155  }
156  case Protocol::COMMAND_KEEPALIVE:
157  {
158  uint64_t session_id;
159  uint32_t notifications;
160  uint64_t handle, event_id;
161  uint32_t event_mask;
162  const char *name;
163  const uint8_t *post_notification_buf;
164  size_t post_notification_size;
165 
167  return;
168 
169  // update jeopardy time
171  chrono::milliseconds(m_lease_interval);
172 
173  session_id = decode_i64(&decode_ptr, &decode_remain);
174  error = decode_i32(&decode_ptr, &decode_remain);
175 
176  if (error != Error::OK) {
177  HT_ERRORF("Master session (%llu) error - %s", (Llu)session_id, Error::get_text(error));
178  expire_session();
179  return;
180  }
181 
182  if (m_session_id == 0) {
183  m_session_id = session_id;
184  if (!m_conn_handler) {
185  m_conn_handler = make_shared<ClientConnectionHandler>(m_comm, m_session, m_lease_interval);
186  m_conn_handler->set_verbose_mode(m_verbose);
187  m_conn_handler->set_session_id(m_session_id);
188  }
189  }
190 
191  notifications = decode_i32(&decode_ptr, &decode_remain);
192 
193  // Start Issue 313 instrumentation
194  post_notification_buf = decode_ptr;
195  post_notification_size = decode_remain;
196 
197  std::set<uint64_t> delivered_events;
198 
199  for (uint32_t i=0; i<notifications; i++) {
200  handle = decode_i64(&decode_ptr, &decode_remain);
201  event_id = decode_i64(&decode_ptr, &decode_remain);
202  event_mask = decode_i32(&decode_ptr, &decode_remain);
203 
204  if (m_delivered_events.count(event_id) > 0)
205  delivered_events.insert(event_id);
206 
207  HandleMap::iterator iter = m_handle_map.find(handle);
208  if (iter == m_handle_map.end()) {
209  // We have a bad notification, ie. a notification for a handle not in m_handle_map
210  auto now = chrono::steady_clock::now();
211 
212  HT_ERROR_OUT << "[Issue 313] Received bad notification session=" << m_session_id
213  << ", handle=" << handle << ", event_id=" << event_id
214  << ", event_mask=" << event_mask << HT_END;
215  // ignore all notifications in this keepalive message (don't kick up to
216  // application) this avoids multiple notifications being sent to app (Issue 314)
217  notifications=0;
218 
219  // Check if we already have a pending bad notification for this handle
220  BadNotificationHandleMap::iterator uiter = m_bad_handle_map.find(handle);
221  if (uiter == m_bad_handle_map.end()) {
222  // if not then store
223  m_bad_handle_map[handle] = now;
224  }
225  else {
226  // if we do then check against grace period
227  uint64_t time_diff = chrono::duration_cast<chrono::milliseconds>(now - (*uiter).second).count();
228  if (time_diff > ms_bad_notification_grace_period) {
229  HT_ERROR_OUT << "[Issue 313] Still receiving bad notification after grace "
230  << "period=" << ms_bad_notification_grace_period
231  << "ms, session=" << m_session_id
232  << ", handle=" << handle << ", event_id=" << event_id
233  << ", event_mask=" << event_mask << HT_END;
234  HT_ASSERT(false);
235  }
236  }
237  break;
238  }
239  else {
240  // This is a good notification, clear any prev bad notifications for this handle
241  BadNotificationHandleMap::iterator uiter = m_bad_handle_map.find(handle);
242  if (uiter != m_bad_handle_map.end()) {
243  HT_ERROR_OUT << "[Issue 313] Previously bad notification cleared within grace "
244  << "period=" << ms_bad_notification_grace_period
245  << "ms, session=" << m_session_id
246  << ", handle=" << handle << ", event_id=" << event_id
247  << ", event_mask=" << event_mask << HT_END;
248  m_bad_handle_map.erase(uiter);
249  }
250  }
251 
252  if (event_mask == EVENT_MASK_ATTR_SET ||
253  event_mask == EVENT_MASK_ATTR_DEL ||
254  event_mask == EVENT_MASK_CHILD_NODE_ADDED ||
255  event_mask == EVENT_MASK_CHILD_NODE_REMOVED)
256  decode_vstr(&decode_ptr, &decode_remain);
257  else if (event_mask == EVENT_MASK_LOCK_ACQUIRED)
258  decode_i32(&decode_ptr, &decode_remain);
259  else if (event_mask == EVENT_MASK_LOCK_GRANTED) {
260  decode_i32(&decode_ptr, &decode_remain);
261  decode_i64(&decode_ptr, &decode_remain);
262  }
263  }
264 
265  m_delivered_events.swap(delivered_events);
266 
267  decode_ptr = post_notification_buf;
268  decode_remain = post_notification_size;
269  // End Issue 313 instrumentation
270 
271  for (uint32_t i=0; i<notifications; i++) {
272  handle = decode_i64(&decode_ptr, &decode_remain);
273  event_id = decode_i64(&decode_ptr, &decode_remain);
274  event_mask = decode_i32(&decode_ptr, &decode_remain);
275 
276  HandleMap::iterator iter = m_handle_map.find(handle);
277  if (iter == m_handle_map.end()) {
278  HT_ERROR_OUT << "[Issue 313] this should never happen bad notification session="
279  << m_session_id << ", handle=" << handle << ", event_id=" << event_id
280  << ", event_mask=" << event_mask << HT_END;
281  assert(false);
282  }
283 
284  ClientHandleStatePtr handle_state = (*iter).second;
285 
286  if (event_mask == EVENT_MASK_ATTR_SET ||
287  event_mask == EVENT_MASK_ATTR_DEL ||
288  event_mask == EVENT_MASK_CHILD_NODE_ADDED ||
289  event_mask == EVENT_MASK_CHILD_NODE_REMOVED) {
290  name = decode_vstr(&decode_ptr, &decode_remain);
291 
292  if (!m_delivered_events.insert(event_id).second)
293  continue;
294 
295  if (handle_state->callback) {
296  if (event_mask == EVENT_MASK_ATTR_SET)
297  handle_state->callback->attr_set(name);
298  else if (event_mask == EVENT_MASK_ATTR_DEL)
299  handle_state->callback->attr_del(name);
300  else if (event_mask == EVENT_MASK_CHILD_NODE_ADDED)
301  handle_state->callback->child_node_added(name);
302  else
303  handle_state->callback->child_node_removed(name);
304  }
305  }
306  else if (event_mask == EVENT_MASK_LOCK_ACQUIRED) {
307  uint32_t mode = decode_i32(&decode_ptr, &decode_remain);
308  if (!m_delivered_events.insert(event_id).second)
309  continue;
310  if (handle_state->callback)
311  handle_state->callback->lock_acquired(mode);
312  }
313  else if (event_mask == EVENT_MASK_LOCK_RELEASED) {
314  if (!m_delivered_events.insert(event_id).second)
315  continue;
316  if (handle_state->callback)
317  handle_state->callback->lock_released();
318  }
319  else if (event_mask == EVENT_MASK_LOCK_GRANTED) {
320  uint32_t mode = decode_i32(&decode_ptr, &decode_remain);
321  handle_state->lock_generation = decode_i64(&decode_ptr,
322  &decode_remain);
323  if (!m_delivered_events.insert(event_id).second)
324  continue;
325  handle_state->lock_status = LOCK_STATUS_GRANTED;
326  handle_state->sequencer->generation =
327  handle_state->lock_generation;
328  handle_state->sequencer->mode = mode;
329  handle_state->cond.notify_all();
330  }
331 
332  }
333  /*
334  if (m_verbose) {
335  HT_INFOF("session_id = %lld", m_session_id);
336  }
337  **/
338 
339  if (m_conn_handler->disconnected())
340  m_conn_handler->initiate_connection(m_master_addr);
341 
342  if (notifications > 0) {
343  CommBufPtr cbp(Protocol::create_client_keepalive_request(
345  m_last_keep_alive_send_time = chrono::steady_clock::now();
346  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp)
347  != Error::OK)) {
348  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
349  exit(EXIT_FAILURE);
350  }
351  }
352 
354 
355  assert(m_session_id == session_id);
356  }
357  break;
358  default:
359  HT_THROWF(Error::PROTOCOL_ERROR, "Unimplemented command (%llu)",
360  (Llu)event->header.command);
361  }
362  }
363  catch (Exception &e) {
364  HT_ERROR_OUT << e << HT_END;
365  }
366  }
367  else if (event->type == Hypertable::Event::TIMER) {
368  int state;
369 
370  if ((state = m_session->get_state()) == Session::STATE_EXPIRED)
371  return;
372 
373  if (state == Session::STATE_SAFE) {
374  if (m_jeopardy_time < chrono::steady_clock::now() && !m_reconnect)
376  }
377  else if (m_session->expired()) {
378  expire_session();
379  return;
380  }
381 
382  CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
384 
385  m_last_keep_alive_send_time = chrono::steady_clock::now();
386 
387  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp)
388  != Error::OK)) {
389  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
390  exit(EXIT_FAILURE);
391  }
392 
393  if ((error = m_comm->set_timer(m_keep_alive_interval, shared_from_this()))
394  != Error::OK) {
395  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
396  exit(EXIT_FAILURE);
397  }
398  }
399  else {
400  HT_INFOF("%s", event->to_str().c_str());
401  }
402 }
403 
404 
407 
408  if (m_conn_handler)
409  m_conn_handler->close();
410  this_thread::sleep_for(chrono::milliseconds(2000));
411  m_conn_handler = 0;
412  m_handle_map.clear();
413  m_bad_handle_map.clear();
414  m_session_id = 0;
415 
416  if (m_reconnect) {
417  auto now = chrono::steady_clock::now();
419  m_jeopardy_time = now + chrono::milliseconds(m_lease_interval);
420 
422 
423  m_comm->create_datagram_receive_socket(m_local_addr, 0x10, shared_from_this());
424 
425  CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
427 
428  int error;
429  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp)
430  != Error::OK)) {
431  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
432  exit(EXIT_FAILURE);
433  }
434 
435  if ((error = m_comm->set_timer(m_keep_alive_interval, shared_from_this()))
436  != Error::OK) {
437  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
438  exit(EXIT_FAILURE);
439  }
440  }
441 }
442 
443 
445  int error;
446 
447  {
448  lock_guard<recursive_mutex> lock(m_mutex);
449  if (m_dead || m_destroying)
450  return;
451  m_destroying = true;
452  if (m_conn_handler)
453  m_conn_handler->disable_callbacks();
454  }
455 
456  CommBufPtr cbp(Hyperspace::Protocol::create_client_keepalive_request(
458 
459  if ((error = m_comm->send_datagram(m_master_addr, m_local_addr, cbp)
460  != Error::OK))
461  HT_ERRORF("Unable to send datagram - %s", Error::get_text(error));
462 
464 
465 }
466 
468  unique_lock<recursive_mutex> lock(m_mutex);
469  if (m_dead)
470  return;
471 
472  m_destroying = true;
473  if (m_cond_destroyed.wait_for(lock, chrono::seconds(2)) == cv_status::timeout)
474  destroy();
475 }
476 
478  if (m_dead)
479  return;
480  m_dead = true;
481  if (m_conn_handler)
482  m_conn_handler->close();
483  m_conn_handler = 0;
484  m_handle_map.clear();
485  m_bad_handle_map.clear();
486  m_session_id = 0;
488 }
489 
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
Lock successfully granted.
Definition: LockSequencer.h:58
int get_state()
Returns current state (internal method)
Definition: Session.cc:1223
Declarations for Protocol.
static const uint64_t ms_bad_notification_grace_period
void advance_expire_time(std::chrono::steady_clock::time_point now)
Definition: Session.h:672
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
attempting to reconnect session
Definition: Session.h:163
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
Definition: Comm.cc:437
session has expired
Definition: Session.h:157
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
virtual void handle(Hypertable::EventPtr &event)
Callback method.
Hyperspace definitions
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
#define HT_ASSERT(_e_)
Definition: Logger.h:396
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
void close_socket(const CommAddress &addr)
Closes the socket specified by the addr argument.
Definition: Comm.cc:488
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
ClientConnectionHandlerPtr m_conn_handler
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
Time related declarations.
std::chrono::steady_clock::time_point m_jeopardy_time
Hyperspace session.
Definition: Session.h:148
Hypertable definitions
bool expired()
Checks for session expiration (internal method)
Definition: Session.cc:1229
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
int state_transition(int state)
Transions state (internal method)
Definition: Session.cc:1181
std::chrono::steady_clock::time_point m_last_keep_alive_send_time
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void update_master_addr(const String &host)
Definition: Session.cc:92
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
std::condition_variable_any m_cond_destroyed
Internet address wrapper classes and utility functions.
Request/response message event.
Definition: Event.h:63
Timer event
Definition: Event.h:65
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
Definition: Comm.cc:367
This is a generic exception class for Hypertable.
Definition: Error.h:314
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
session is in jeopardy
Definition: Session.h:159
std::shared_ptr< ClientHandleState > ClientHandleStatePtr
int set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler)
Sets a timer for duration_millis milliseconds in the future.
Definition: Comm.cc:465
#define HT_TRY(_s_, _code_)
Definition: Error.h:517
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
#define HT_DEBUG_OUT
Definition: Logger.h:261