0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ConnectionManager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 #include "ConnectionManager.h"
30 
31 #include <AsyncComm/Comm.h>
32 #include <AsyncComm/Protocol.h>
33 
34 #include <Common/Error.h>
35 #include <Common/Logger.h>
36 #include <Common/Random.h>
37 #include <Common/Serialization.h>
38 #include <Common/System.h>
39 #include <Common/Time.h>
40 
41 #include <chrono>
42 #include <cstdlib>
43 #include <iostream>
44 #include <unordered_map>
45 
46 extern "C" {
47 #include <poll.h>
48 #include <sys/types.h>
49 #include <sys/socket.h>
50 #include <netinet/in.h>
51 #include <arpa/inet.h>
52 }
53 
54 using namespace Hypertable;
55 using namespace std;
56 
57 void
58 ConnectionManager::add(const CommAddress &addr, uint32_t timeout_ms,
59  const char *service_name, DispatchHandlerPtr &handler) {
60  CommAddress null_addr;
61  ConnectionInitializerPtr null_initializer;
62  add_internal(addr, null_addr, timeout_ms, service_name, handler, null_initializer);
63 }
64 
66  uint32_t timeout_ms, const char *service_name,
67  DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer) {
68  CommAddress null_addr;
69  add_internal(addr, null_addr, timeout_ms, service_name, handler, initializer);
70 }
71 
72 
73 void
74 ConnectionManager::add(const CommAddress &addr, uint32_t timeout_ms,
75  const char *service_name) {
76  DispatchHandlerPtr null_disp_handler;
77  add(addr, timeout_ms, service_name, null_disp_handler);
78 }
79 
80 
81 void
82 ConnectionManager::add(const CommAddress &addr, const CommAddress &local_addr,
83  uint32_t timeout_ms, const char *service_name,
84  DispatchHandlerPtr &handler) {
85  ConnectionInitializerPtr null_initializer;
86  add_internal(addr, local_addr, timeout_ms, service_name, handler, null_initializer);
87 }
88 
89 
90 void
91 ConnectionManager::add(const CommAddress &addr, const CommAddress &local_addr,
92  uint32_t timeout_ms, const char *service_name) {
93  DispatchHandlerPtr null_disp_handler;
94  add(addr, local_addr, timeout_ms, service_name, null_disp_handler);
95 }
96 
97 void
99  const CommAddress &local_addr, uint32_t timeout_ms,
100  const char *service_name, DispatchHandlerPtr &handler,
101  ConnectionInitializerPtr &initializer) {
102  lock_guard<mutex> lock(m_impl->mutex);
103  ConnectionStatePtr conn_state;
104 
106  if (!m_impl->thread.joinable())
107  m_impl->thread = std::thread([=](){ this->connect_retry_loop(); });
108 
109  HT_ASSERT(addr.is_set());
110 
111  if (addr.is_proxy()) {
112  auto iter = m_impl->conn_map_proxy.find(addr.proxy);
113  if (iter != m_impl->conn_map_proxy.end() && iter->second->state != State::DECOMMISSIONED)
114  return;
115  }
116  else if (addr.is_inet()) {
118  m_impl->conn_map.find(addr.inet);
119  if (iter != m_impl->conn_map.end() && iter->second->state != State::DECOMMISSIONED)
120  return;
121  }
122 
123  conn_state = make_shared<ConnectionState>();
124  conn_state->addr = addr;
125  conn_state->local_addr = local_addr;
126  conn_state->timeout_ms = timeout_ms;
127  conn_state->handler = handler;
128  conn_state->initializer = initializer;
129  conn_state->service_name = (service_name) ? service_name : "";
130  conn_state->next_retry = std::chrono::steady_clock::now();
131 
132  if (addr.is_proxy())
133  m_impl->conn_map_proxy[addr.proxy] = conn_state;
134  else
135  m_impl->conn_map[addr.inet] = conn_state;
136 
137  {
138  lock_guard<mutex> conn_lock(conn_state->mutex);
139  send_connect_request(conn_state);
140  }
141 }
142 
143 
144 bool
146  uint32_t max_wait_ms) {
147  Timer timer(max_wait_ms, true);
148  return wait_for_connection(addr, timer);
149 }
150 
151 
152 bool
154  Timer &timer) {
155  ConnectionStatePtr conn_state_ptr;
156 
157  {
158  lock_guard<mutex> lock(m_impl->mutex);
159  if (addr.is_inet()) {
161  m_impl->conn_map.find(addr.inet);
162  if (iter == m_impl->conn_map.end())
163  return false;
164  conn_state_ptr = (*iter).second;
165  }
166  else if (addr.is_proxy()) {
167  auto iter = m_impl->conn_map_proxy.find(addr.proxy);
168  if (iter == m_impl->conn_map_proxy.end())
169  return false;
170  conn_state_ptr = (*iter).second;
171  }
172  }
173 
174  return wait_for_connection(conn_state_ptr, timer);
175 }
176 
177 
179  Timer &timer) {
180 
181  timer.start();
182 
183  {
184  unique_lock<mutex> conn_lock(conn_state->mutex);
185 
186  auto duration = std::chrono::milliseconds(timer.remaining());
187 
188  if (!conn_state->cond.wait_for(conn_lock, duration, [&conn_state](){ return conn_state->state == State::READY; }))
189  return false;
190 
191  if (conn_state->state == State::DECOMMISSIONED)
192  return false;
193 
194  }
195 
196  return true;
197 }
198 
199 
208 void
210  int error;
211 
212  if (conn_state->state == State::DECOMMISSIONED)
213  HT_FATALF("Attempt to connect decommissioned connection to service='%s'",
214  conn_state->service_name.c_str());
215 
216  if (!conn_state->local_addr.is_set())
217  error = m_impl->comm->connect(conn_state->addr, shared_from_this());
218  else
219  error = m_impl->comm->connect(conn_state->addr, conn_state->local_addr,
220  shared_from_this());
221 
222  if (error == Error::OK)
223  return;
224  else if (error == Error::COMM_ALREADY_CONNECTED) {
225  if (conn_state->state == State::DISCONNECTED)
226  conn_state->state = State::READY;
227  conn_state->cond.notify_all();
228  }
229  else if (error == Error::COMM_INVALID_PROXY) {
230  m_impl->conn_map.erase(conn_state->inet_addr);
231  m_impl->conn_map_proxy.erase(conn_state->addr.proxy);
232  conn_state->state = State::DECOMMISSIONED;
233  conn_state->cond.notify_all();
234  }
235  else if (error != Error::COMM_BROKEN_CONNECTION) {
236  if (!m_impl->quiet_mode) {
237  if (conn_state->service_name != "")
238  HT_INFOF("Connection attempt to %s at %s failed - %s. Will retry "
239  "again in %d milliseconds...", conn_state->service_name.c_str(),
240  conn_state->addr.to_str().c_str(), Error::get_text(error),
241  (int)conn_state->timeout_ms);
242  else
243  HT_INFOF("Connection attempt to service at %s failed - %s. Will retry "
244  "again in %d milliseconds...", conn_state->addr.to_str().c_str(),
245  Error::get_text(error), (int)conn_state->timeout_ms);
246  }
247 
248  // Reschedule (throw in a little randomness)
249  conn_state->next_retry = std::chrono::steady_clock::now() +
250  std::chrono::milliseconds(conn_state->timeout_ms);
251 
252  if (Random::number32() & 1)
253  conn_state->next_retry -= Random::duration_millis(2000);
254  else
255  conn_state->next_retry += Random::duration_millis(2000);
256 
257  // add to retry heap
258  m_impl->retry_queue.push(conn_state);
259  m_impl->retry_cond.notify_one();
260  }
261 
262 }
263 
264 
266  bool check_inet_addr = false;
267  InetAddr inet_addr;
268  bool do_close = false;
269  int error = Error::OK;
270 
271  HT_ASSERT(addr.is_set());
272 
273  {
274  lock_guard<mutex> lock(m_impl->mutex);
275 
276  if (addr.is_proxy()) {
277  auto iter = m_impl->conn_map_proxy.find(addr.proxy);
278  if (iter != m_impl->conn_map_proxy.end()) {
279  {
280  lock_guard<mutex> conn_lock((*iter).second->mutex);
281  check_inet_addr = true;
282  inet_addr = (*iter).second->inet_addr;
283  if ((*iter).second->state == State::CONNECTED ||
284  (*iter).second->state == State::READY)
285  do_close = true;
286  (*iter).second->state = State::DECOMMISSIONED;
287  (*iter).second->cond.notify_all();
288  }
289  m_impl->conn_map_proxy.erase(iter);
290  }
291  }
292  else if (addr.is_inet()) {
293  check_inet_addr = true;
294  inet_addr = addr.inet;
295  }
296 
297  if (check_inet_addr) {
299  m_impl->conn_map.find(inet_addr);
300  if (iter != m_impl->conn_map.end()) {
301  {
302  lock_guard<mutex> conn_lock((*iter).second->mutex);
303  if ((*iter).second->state == State::CONNECTED ||
304  (*iter).second->state == State::READY)
305  do_close = true;
306  (*iter).second->state = State::DECOMMISSIONED;
307  (*iter).second->cond.notify_all();
308  }
309  m_impl->conn_map.erase(iter);
310  }
311 
312  }
313  }
314 
315  if (do_close)
316  m_impl->comm->close_socket(addr);
317 
318  return error;
319 }
320 
321 
322 
333 void
335  lock_guard<mutex> lock(m_impl->mutex);
336  ConnectionStatePtr conn_state;
337 
338  {
339  auto iter = m_impl->conn_map.find(event->addr);
340  if (iter != m_impl->conn_map.end())
341  conn_state = (*iter).second;
342  }
343 
344  if (!conn_state && event->proxy) {
345  auto iter = m_impl->conn_map_proxy.find(event->proxy);
346  if (iter != m_impl->conn_map_proxy.end()) {
347  conn_state = (*iter).second;
349  m_impl->conn_map[event->addr] = conn_state;
350  }
351  }
352 
353  if (conn_state) {
354  lock_guard<mutex> conn_lock(conn_state->mutex);
355 
356  if (event->type == Event::CONNECTION_ESTABLISHED) {
357  conn_state->inet_addr = event->addr;
358  if (conn_state->initializer) {
359  conn_state->state = State::CONNECTED;
360  send_initialization_request(conn_state);
361  return;
362  }
363  else {
364  conn_state->state = State::READY;
365  conn_state->cond.notify_all();
366  }
367  }
368  else if (event->type == Event::ERROR ||
369  event->type == Event::DISCONNECT) {
370  if (event->proxy && !m_impl->comm->translate_proxy(event->proxy, 0)) {
371  m_impl->conn_map.erase(conn_state->inet_addr);
372  m_impl->conn_map_proxy.erase(conn_state->addr.proxy);
373  conn_state->state = State::DECOMMISSIONED;
374  conn_state->cond.notify_all();
375  }
376  else {
377  if (!m_impl->quiet_mode)
378  HT_INFOF("Received event %s", event->to_str().c_str());
379  string message = (event->type == Event::DISCONNECT) ?
380  "Disconnected" : Error::get_text(event->error);
381  conn_state->state = State::DISCONNECTED;
382  schedule_retry(conn_state, message);
383  }
384  }
385  else if (event->type == Event::MESSAGE) {
386  if (conn_state->initializer && conn_state->state == State::CONNECTED) {
389  return;
390  }
391  else if (event->header.command != conn_state->initializer->initialization_command()) {
392  String err_msg = "Connection initialization not yet complete";
393  CommHeader header;
394  header.initialize_from_request_header(event->header);
395  CommBufPtr cbuf( new CommBuf(header, 4 + Serialization::encoded_length_str16(err_msg)) );
396  cbuf->append_i32(Error::CONNECTION_NOT_INITIALIZED);
397  cbuf->append_str16(err_msg);
398  m_impl->comm->send_response(event->addr, cbuf);
399  return;
400  }
401  if (!conn_state->initializer->process_initialization_response(event.get()))
402  HT_FATALF("Unable to initialize connection to %s, exiting ...",
403  conn_state->service_name.c_str());
404  conn_state->state = State::READY;
405  conn_state->cond.notify_all();
406  return;
407  }
408  }
409 
410  // Chain event to application supplied handler
411  if (conn_state->handler)
412  conn_state->handler->handle(event);
413  }
414  else {
415  HT_WARNF("Unable to find connection for %s in map.",
416  InetAddr::format(event->addr).c_str());
417  }
418 }
419 
421  CommBufPtr cbuf(conn_state->initializer->create_initialization_request());
422  int error = m_impl->comm->send_request(conn_state->inet_addr, 60000, cbuf, this);
423  if (error == Error::COMM_BROKEN_CONNECTION ||
424  error == Error::COMM_NOT_CONNECTED ||
425  error == Error::COMM_INVALID_PROXY) {
426  if (!m_impl->quiet_mode)
427  HT_INFOF("Received error %d", error);
428  conn_state->state = State::DISCONNECTED;
429  schedule_retry(conn_state, Error::get_text(error));
430  }
431  else if (error != Error::OK)
432  HT_FATALF("Problem initializing connection to %s - %s",
433  conn_state->service_name.c_str(), Error::get_text(error));
434 }
435 
436 
438  const string &message) {
439  if (!m_impl->quiet_mode)
440  HT_INFOF("%s: Problem connecting to %s, will retry in %d "
441  "milliseconds...", message.c_str(),
442  conn_state->service_name.c_str(), (int)conn_state->timeout_ms);
443 
444  // this logic could proably be smarter. For example, if the last
445  // connection attempt was a long time ago, then schedule immediately
446  // otherwise, if this event is the result of an immediately prior connect
447  // attempt, then do the following
448  conn_state->next_retry = std::chrono::steady_clock::now() +
449  std::chrono::milliseconds(conn_state->timeout_ms);
450 
451  // add to retry heap
452  m_impl->retry_queue.push(conn_state);
453  m_impl->retry_cond.notify_one();
454 }
455 
456 
461  unique_lock<mutex> lock(m_impl->mutex);
462  ConnectionStatePtr conn_state;
463 
464  while (!m_impl->shutdown) {
465 
466  while (m_impl->retry_queue.empty()) {
467  m_impl->retry_cond.wait(lock);
468  if (m_impl->shutdown)
469  break;
470  }
471 
472  if (m_impl->shutdown)
473  break;
474 
475  conn_state = m_impl->retry_queue.top();
476 
477  {
478  lock_guard<mutex> conn_lock(conn_state->mutex);
479  if (conn_state->state == State::DISCONNECTED) {
480  if (conn_state->next_retry <= std::chrono::steady_clock::now()) {
481  m_impl->retry_queue.pop();
482  send_connect_request(conn_state);
483  continue;
484  }
485  }
486  else if (conn_state->state == State::CONNECTED && conn_state->initializer) {
487  if (conn_state->next_retry <= std::chrono::steady_clock::now()) {
488  m_impl->retry_queue.pop();
489  send_initialization_request(conn_state);
490  continue;
491  }
492  }
493  else {
494  m_impl->retry_queue.pop();
495  continue;
496  }
497  }
498  m_impl->retry_cond.wait_until(lock, conn_state->next_retry);
499  }
500 }
501 
502 
Retrieves system information (hardware, installation directory, etc)
void add(const CommAddress &addr, uint32_t timeout_ms, const char *service_name)
Adds a connection.
SharedImplPtr m_impl
Smart pointer to connection manager state.
int remove(const CommAddress &addr)
Removes a connection from the connection manager.
void initialize_from_request_header(CommHeader &req_header)
Initializes header from req_header.
Definition: CommHeader.h:128
#define HT_WARNF(msg,...)
Definition: Logger.h:290
String proxy
Proxy name.
Definition: CommAddress.h:175
void add_internal(const CommAddress &addr, const CommAddress &local_addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Called by the add methods to add a connection.
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
void add_with_initializer(const CommAddress &addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Adds a connection with a dispatch handler and connection initializer.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< ConnectionState > ConnectionStatePtr
Smart pointer to ConnectionState.
void schedule_retry(ConnectionStatePtr &conn_state, const std::string &message)
Schedules a connection retry attempt.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
void connect_retry_loop()
Connect retry loop.
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
Definition: Random.cc:55
Error event
Definition: Event.h:64
Connection established event.
Definition: Event.h:61
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
Definition: IndexTables.cc:34
#define HT_ASSERT(_e_)
Definition: Logger.h:396
virtual void handle(EventPtr &event)
Primary dispatch handler method.
Unordered map specialization for InetAddr keys.
Definition: SockAddrMap.h:57
size_t encoded_length_str16(const char *str)
Computes the encoded length of a string16 encoding.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
Logging routines and macros.
Compatibility Macros for C/C++.
Functions to serialize/deserialize primitives to/from a memory buffer.
Connection disconnected event.
Definition: Event.h:62
Time related declarations.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
bool is_set() const
Returns true if address has been initialized.
Definition: CommAddress.h:159
bool is_inet() const
Returns true if address is of type CommAddress::INET.
Definition: CommAddress.h:153
bool wait_for_connection(const CommAddress &addr, uint32_t max_wait_ms)
Blocks until the connection to the given address is established.
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
Declarations for ConnectionManager.
void start()
Starts the timer.
Definition: Timer.h:64
Declarations for Comm.
Declarations for Protocol.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Random number generator for int32, int64, double and ascii arrays.
Request/response message event.
Definition: Event.h:63
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
void send_initialization_request(ConnectionStatePtr &conn_state)
Sends an initialization request.
void send_connect_request(ConnectionStatePtr &conn_state)
Calls Comm::connect to establish a connection.
InetAddr inet
IPv4:port address.
Definition: CommAddress.h:176
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
Definition: CommAddress.h:147
std::shared_ptr< ConnectionInitializer > ConnectionInitializerPtr
Smart pointer to ConnectionInitializer.
Error codes, Exception handling, error logging.
static std::chrono::milliseconds duration_millis(uint32_t maximum)
Returns a random millisecond duration.
Definition: Random.cc:77
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52