0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ServerKeepaliveHandler.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
26 #include "ServerKeepaliveHandler.h"
27 #include "Master.h"
28 #include "Protocol.h"
29 #include "SessionData.h"
30 
31 #include <Common/Error.h>
32 #include <Common/InetAddr.h>
33 #include <Common/StringExt.h>
34 #include <Common/System.h>
35 
36 using namespace Hypertable;
37 using namespace Hyperspace;
38 using namespace Serialization;
39 using namespace std;
40 
42  ApplicationQueuePtr &app_queue)
43  : m_comm(comm), m_master(master),
44  m_app_queue_ptr(app_queue), m_shutdown(false) {
45 
47 
48 }
49 
51  int error;
52 
53  if ((error = m_comm->set_timer(Master::TIMER_INTERVAL_MS, shared_from_this()))
54  != Error::OK) {
55  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
56  exit(EXIT_FAILURE);
57  }
58 
59  m_master->tick();
60 }
61 
62 
64  int error;
65 
66  {
67  lock_guard<mutex> lock(m_mutex);
68  if (m_shutdown)
69  return;
70  }
71 
72  if (event->type == Hypertable::Event::MESSAGE) {
73  const uint8_t *decode_ptr = event->payload;
74  size_t decode_remain = event->payload_len;
75 
76  try {
77 
78  // sanity check command code
79  if (event->header.command >= Protocol::COMMAND_MAX)
80  HT_THROWF(Error::PROTOCOL_ERROR, "Invalid command (%llu)",
81  (Llu)event->header.command);
82 
83  switch (event->header.command) {
84  case Protocol::COMMAND_KEEPALIVE: {
85  uint64_t session_id = decode_i64(&decode_ptr, &decode_remain);
86  uint32_t delivered_event_count;
87  std::set<uint64_t> delivered_events;
88  delivered_event_count = decode_i32(&decode_ptr, &decode_remain);
89  for (uint32_t i=0; i<delivered_event_count; i++)
90  delivered_events.insert( decode_i64(&decode_ptr, &decode_remain) );
91  bool shutdown = decode_bool(&decode_ptr, &decode_remain);
92 
94  m_master, session_id, delivered_events, shutdown, event,
95  &m_send_addr ) );
96  }
97  break;
98  default:
99  HT_THROWF(Error::PROTOCOL_ERROR, "Unimplemented command (%llu)",
100  (Llu)event->header.command);
101  }
102  }
103  catch (Exception &e) {
104  HT_ERROR_OUT << e << HT_END;
105  }
106  }
107  else if (event->type == Hypertable::Event::TIMER) {
108 
109  m_master->tick();
110 
111  try {
113  }
114  catch (Exception &e) {
115  HT_ERROR_OUT << e << HT_END;
116  }
117 
118  if ((error = m_comm->set_timer(Master::TIMER_INTERVAL_MS, shared_from_this()))
119  != Error::OK) {
120  HT_ERRORF("Problem setting timer - %s", Error::get_text(error));
121  exit(EXIT_FAILURE);
122  }
123 
124  }
125  else {
126  HT_INFOF("%s", event->to_str().c_str());
127  }
128 }
129 
130 
132  int error = 0;
133  SessionDataPtr session_ptr;
134 
135  {
136  lock_guard<mutex> lock(m_mutex);
137  if (m_shutdown)
138  return;
139  }
140 
141  //HT_INFOF("Delivering event notifications for session %lld", session_id);
142 
143  if (!m_master->get_session(session_id, session_ptr)) {
144  HT_ERRORF("Unable to find data for session %llu", (Llu)session_id);
145  return;
146  }
147 
148  /*
149  HT_INFOF("Sending Keepalive request to %s",
150  InetAddr::format(session_ptr->addr));
151  **/
152 
153  CommBufPtr cbp(Protocol::create_server_keepalive_request(session_ptr));
154 
155  if ((error = m_comm->send_datagram(session_ptr->get_addr(), m_send_addr, cbp))
156  != Error::OK) {
157  HT_ERRORF("Comm::send_datagram returned %s", Error::get_text(error));
158  }
159 }
160 
162  lock_guard<mutex> lock(m_mutex);
163  m_shutdown = true;
164  m_app_queue_ptr->shutdown();
165 }
Retrieves system information (hardware, installation directory, etc)
ServerKeepaliveHandler(Comm *comm, Master *master, ApplicationQueuePtr &app_queue_ptr)
void deliver_event_notifications(uint64_t session_id)
Declarations for Protocol.
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
bool get_session(uint64_t session_id, SessionDataPtr &session_data)
Definition: Master.cc:291
std::shared_ptr< SessionData > SessionDataPtr
Definition: SessionData.h:156
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
Definition: Comm.cc:437
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
Hyperspace definitions
void get_datagram_send_address(struct sockaddr_in *addr)
Definition: Master.h:162
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
bool decode_bool(const uint8_t **bufp, size_t *remainp)
Decodes a boolean value from the given buffer.
Definition: Serialization.h:96
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
Hypertable definitions
Entry point to AsyncComm service.
Definition: Comm.h:61
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Internet address wrapper classes and utility functions.
Request/response message event.
Definition: Event.h:63
Timer event
Definition: Event.h:65
This is a generic exception class for Hypertable.
Definition: Error.h:314
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
int set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler)
Sets a timer for duration_millis milliseconds in the future.
Definition: Comm.cc:465
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
virtual void handle(Hypertable::EventPtr &event_ptr)
Callback method.