0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ConnectionManager.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #ifndef AsyncComm_ConnectionManager_h
29 #define AsyncComm_ConnectionManager_h
30 
31 #include <AsyncComm/Comm.h>
32 #include <AsyncComm/CommAddress.h>
35 
36 #include <Common/SockAddrMap.h>
37 #include <Common/Timer.h>
38 
39 #include <chrono>
40 #include <condition_variable>
41 #include <mutex>
42 #include <queue>
43 #include <string>
44 #include <thread>
45 #include <unordered_map>
46 
47 namespace Hypertable {
48 
53  class Event;
54 
61 
62  enum class State {
63  DISCONNECTED = 0,
64  CONNECTED,
65  READY,
67  };
68 
72  public:
80  uint32_t timeout_ms;
90  std::condition_variable cond;
92  std::chrono::steady_clock::time_point next_retry;
94  std::string service_name;
95  };
97  typedef std::shared_ptr<ConnectionState> ConnectionStatePtr;
98 
102  bool operator()(const ConnectionStatePtr &cs1,
103  const ConnectionStatePtr &cs2) const {
104  return std::chrono::operator>(cs1->next_retry, cs2->next_retry);
105  }
106  };
107 
110  class SharedImpl {
111  public:
112 
116  shutdown = true;
117  retry_cond.notify_one();
118  if (thread.joinable())
119  thread.join();
120  }
121 
127  std::condition_variable retry_cond;
129  std::thread thread;
133  std::unordered_map<String, ConnectionStatePtr> conn_map_proxy;
135  std::priority_queue<ConnectionStatePtr, std::vector<ConnectionStatePtr>,
140  bool shutdown;
141  };
142 
144  typedef std::shared_ptr<SharedImpl> SharedImplPtr;
145 
146  public:
147 
153  ConnectionManager(Comm *comm = 0) {
154  m_impl = std::make_shared<SharedImpl>();
155  m_impl->comm = comm ? comm : Comm::instance();
156  m_impl->quiet_mode = false;
157  m_impl->shutdown = false;
158  }
159 
160  ConnectionManager(const ConnectionManager &cm) = delete;
161 
164  virtual ~ConnectionManager() { }
165 
182  void add(const CommAddress &addr, uint32_t timeout_ms,
183  const char *service_name);
184 
209  void add(const CommAddress &addr, uint32_t timeout_ms,
210  const char *service_name, DispatchHandlerPtr &handler);
211 
238  void add_with_initializer(const CommAddress &addr, uint32_t timeout_ms,
239  const char *service_name,
240  DispatchHandlerPtr &handler,
241  ConnectionInitializerPtr &initializer);
242 
260  void add(const CommAddress &addr, const CommAddress &local_addr,
261  uint32_t timeout_ms, const char *service_name);
262 
283  void add(const CommAddress &addr, const CommAddress &local_addr,
284  uint32_t timeout_ms, const char *service_name,
285  DispatchHandlerPtr &handler);
286 
292  int remove(const CommAddress &addr);
293 
304  bool wait_for_connection(const CommAddress &addr, uint32_t max_wait_ms);
305 
315  bool wait_for_connection(const CommAddress &addr, Timer &timer);
316 
321  Comm *get_comm() { return m_impl->comm; }
322 
329  void set_quiet_mode(bool mode) { m_impl->quiet_mode = mode; }
330 
339  virtual void handle(EventPtr &event);
340 
344  void connect_retry_loop();
345 
346  private:
347 
364  void add_internal(const CommAddress &addr, const CommAddress &local_addr,
365  uint32_t timeout_ms, const char *service_name,
366  DispatchHandlerPtr &handler,
367  ConnectionInitializerPtr &initializer);
368 
379  bool wait_for_connection(ConnectionStatePtr &conn_state, Timer &timer);
380 
389  void send_connect_request(ConnectionStatePtr &conn_state);
390 
394  void send_initialization_request(ConnectionStatePtr &conn_state);
395 
403  void schedule_retry(ConnectionStatePtr &conn_state, const std::string &message);
404 
406  SharedImplPtr m_impl;
407 
408  };
410  typedef std::shared_ptr<ConnectionManager> ConnectionManagerPtr;
411 
413 }
414 
415 #endif // AsyncComm_ConnectionManager_h
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
void add(const CommAddress &addr, uint32_t timeout_ms, const char *service_name)
Adds a connection.
SharedImplPtr m_impl
Smart pointer to connection manager state.
static std::mutex mutex
Definition: Logger.cc:43
uint32_t timeout_ms
Retry connection attempt after this many milliseconds.
std::mutex mutex
Mutex to serialize concurrent access.
std::unordered_map< String, ConnectionStatePtr > conn_map_proxy
Proxy-to-ConnectionState map.
void add_internal(const CommAddress &addr, const CommAddress &local_addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Called by the add methods to add a connection.
void add_with_initializer(const CommAddress &addr, uint32_t timeout_ms, const char *service_name, DispatchHandlerPtr &handler, ConnectionInitializerPtr &initializer)
Adds a connection with a dispatch handler and connection initializer.
StringWeakOrdering for connection retry heap.
std::shared_ptr< ConnectionState > ConnectionStatePtr
Smart pointer to ConnectionState.
void schedule_retry(ConnectionStatePtr &conn_state, const std::string &message)
Schedules a connection retry attempt.
ConnectionManager(Comm *comm=0)
Constructor.
Establishes and maintains a set of TCP connections.
std::condition_variable retry_cond
Condition variable to signal if anything is on the retry heap.
Abstract base class for application dispatch handlers registered with AsyncComm.
CommAddress local_addr
Local address to bind to.
Declarations for CommAddress.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
Declarations for ConnectionInitializer.
void connect_retry_loop()
Connect retry loop.
std::thread thread
Pointer to connection manager thread object.
bool shutdown
Set to true to signal shutdown in progress.
bool quiet_mode
Set to true to prevent connect failure log message.
std::mutex mutex
Mutex to serialize concurrent access.
virtual void handle(EventPtr &event)
Primary dispatch handler method.
Declarations for DispatchHandler.
std::condition_variable cond
Condition variable used to signal connection state change.
std::chrono::steady_clock::time_point next_retry
Absolute time of next connect attempt.
Encapsulate an internet address.
Definition: InetAddr.h:66
A timer class to keep timeout states across AsyncComm related calls.
ConnectionInitializerPtr initializer
Connection initializer.
std::shared_ptr< SharedImpl > SharedImplPtr
Smart pointer to SharedImpl object.
Connection manager state shared between Connection manager objects.
std::string service_name
Service name of connection for log messages.
SockAddrMap< ConnectionStatePtr > conn_map
InetAddr-to-ConnectionState map.
Declarations for SockAddrMap.
CommAddress addr
Connection address supplied to the add methods.
bool wait_for_connection(const CommAddress &addr, uint32_t max_wait_ms)
Blocks until the connection to the given address is established.
Hypertable definitions
bool operator()(const ConnectionStatePtr &cs1, const ConnectionStatePtr &cs2) const
Entry point to AsyncComm service.
Definition: Comm.h:61
InetAddr inet_addr
Address initialized from Event object.
Declarations for Comm.
DispatchHandlerPtr handler
Registered connection handler.
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
void send_initialization_request(ConnectionStatePtr &conn_state)
Sends an initialization request.
void set_quiet_mode(bool mode)
Sets the SharedImpl::quiet_mode flag which will disable the generation of log messages upon failed co...
void send_connect_request(ConnectionStatePtr &conn_state)
Calls Comm::connect to establish a connection.
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
std::shared_ptr< ConnectionInitializer > ConnectionInitializerPtr
Smart pointer to ConnectionInitializer.
Comm * get_comm()
Returns the Comm object associated with this connection manager.
std::priority_queue< ConnectionStatePtr, std::vector< ConnectionStatePtr >, LtConnectionState > retry_queue
Connect retry heap.
bool operator>(const directory< _Key, _Tp, _Compare, _Allocator > &__x, const directory< _Key, _Tp, _Compare, _Allocator > &__y)
Definition: directory.h:852
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
virtual ~ConnectionManager()
Destructor.