0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Reactor.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #ifndef AsyncComm_Reactor_h
29 #define AsyncComm_Reactor_h
30 
31 #include "Clock.h"
32 #include "PollTimeout.h"
33 #include "RequestCache.h"
34 #include "ExpireTimer.h"
35 
36 #include <boost/thread/thread.hpp>
37 
38 #include <memory>
39 #include <mutex>
40 #include <queue>
41 #include <set>
42 #include <vector>
43 
44 extern "C" {
45 #include <poll.h>
46 }
47 
48 namespace Hypertable {
49 
56  typedef struct {
58  struct pollfd pollfd;
62 
66  class Reactor {
67 
68  friend class ReactorFactory;
69 
70  public:
71 
73  enum class Priority {
74  HIGH = 0, //< High
75  NORMAL //< Normal
76  };
77 
88  Reactor();
89 
94  }
95 
102  void add_request(uint32_t id, IOHandler *handler, DispatchHandler *dh,
103  ClockT::time_point expire) {
104  std::lock_guard<std::mutex> lock(m_mutex);
105  m_request_cache.insert(id, handler, dh, expire);
106  if (m_next_wakeup == ClockT::time_point() || expire < m_next_wakeup)
108  }
109 
115  bool remove_request(uint32_t id, DispatchHandler *&handler) {
116  std::lock_guard<std::mutex> lock(m_mutex);
117  return m_request_cache.remove(id, handler);
118  }
119 
126  std::lock_guard<std::mutex> lock(m_mutex);
127  m_request_cache.purge_requests(handler, error);
128  }
129 
135  void add_timer(ExpireTimer &timer) {
136  std::lock_guard<std::mutex> lock(m_mutex);
137  m_timer_heap.push(timer);
139  }
140 
145  void cancel_timer(const DispatchHandlerPtr &handler) {
146  std::lock_guard<std::mutex> lock(m_mutex);
147  typedef TimerHeap::container_type container_t;
148  container_t container;
149  container.reserve(m_timer_heap.size());
150  ExpireTimer timer;
151  while (!m_timer_heap.empty()) {
152  timer = m_timer_heap.top();
153  if (timer.handler.get() != handler.get())
154  container.push_back(timer);
155  m_timer_heap.pop();
156  }
157  for (const auto &t : container)
158  m_timer_heap.push(t);
159  }
160 
171  void schedule_removal(IOHandler *handler) {
172  std::lock_guard<std::mutex> lock(m_mutex);
173  m_removed_handlers.insert(handler);
174  ExpireTimer timer;
175  timer.expire_time = ClockT::now() + std::chrono::milliseconds(200);
176  timer.handler = 0;
177  m_timer_heap.push(timer);
179  }
180 
186  void get_removed_handlers(std::set<IOHandler *> &dst) {
187  std::lock_guard<std::mutex> lock(m_mutex);
188  dst = m_removed_handlers;
189  m_removed_handlers.clear();
190  }
191 
201  void handle_timeouts(PollTimeout &next_timeout);
202 
203 #if defined(__linux__) || defined (__sun__)
204  int poll_fd;
206 #elif defined (__APPLE__) || defined(__FreeBSD__)
207  int kqd;
209 #endif
210 
221  int add_poll_interest(int sd, short events, IOHandler *handler);
222 
230  int remove_poll_interest(int sd);
231 
240  int modify_poll_interest(int sd, short events);
241 
247  void fetch_poll_array(std::vector<struct pollfd> &fdarray,
248  std::vector<IOHandler *> &handlers);
249 
253  int poll_loop_interrupt();
254 
260  int poll_loop_continue();
261 
265  int interrupt_sd() { return m_interrupt_sd; }
266 
267  protected:
268 
271  typedef std::priority_queue<ExpireTimer,
272  std::vector<ExpireTimer>, LtTimerHeap> TimerHeap;
273 
277  TimerHeap m_timer_heap;
279 
282 
285  std::vector<PollDescriptorT> m_polldata;
286 
289 
291  std::set<IOHandler *> m_removed_handlers;
292  };
293 
295  typedef std::shared_ptr<Reactor> ReactorPtr;
297 }
298 
299 #endif // AsyncComm_Reactor_h
void purge_requests(IOHandler *handler, int32_t error)
Purges all requests assocated with handler.
static std::mutex mutex
Definition: Logger.cc:43
Reactor()
Constructor.
Definition: Reactor.cc:63
void handle_timeouts(PollTimeout &next_timeout)
Processes request timeouts and timers.
Definition: Reactor.cc:170
int remove_poll_interest(int sd)
Remove poll interest for socket (POSIX poll only).
Definition: Reactor.cc:412
int modify_poll_interest(int sd, short events)
Modify poll interest for socket (POSIX poll only).
Definition: Reactor.cc:433
State record for timer.
Definition: ExpireTimer.h:42
Declarations for PollTimeout.
Class used to hold pending request callback handlers.
Definition: RequestCache.h:50
RequestCache m_request_cache
Request cache.
Definition: Reactor.h:276
void schedule_removal(IOHandler *handler)
Schedules handler for removal.
Definition: Reactor.h:171
void add_timer(ExpireTimer &timer)
Adds a timer.
Definition: Reactor.h:135
std::set< IOHandler * > m_removed_handlers
Set of IOHandler objects scheduled for removal.
Definition: Reactor.h:291
IOHandler * handler
I/O handler associated with descriptor.
Definition: Reactor.h:60
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Abstract base class for application dispatch handlers registered with AsyncComm.
Comparison function (functor) for timer heap.
Definition: ExpireTimer.h:49
Maintains next timeout for event polling loop.
Definition: PollTimeout.h:44
TimerHeap m_timer_heap
ExpireTimer heap.
Definition: Reactor.h:277
int poll_loop_continue()
Reset state after call to poll_loop_interrupt.
Definition: Reactor.cc:331
bool remove(uint32_t id, DispatchHandler *&handler)
Removes a request from the cache.
Definition: RequestCache.cc:70
int poll_loop_interrupt()
Forces polling interface wait call to return.
Definition: Reactor.cc:255
int add_poll_interest(int sd, short events, IOHandler *handler)
Add poll interest for socket (POSIX poll only).
Definition: Reactor.cc:383
Declarations for ExpireTimer.
ClockT::time_point expire_time
Absolute expiration time.
Definition: ExpireTimer.h:43
int m_interrupt_sd
Interrupt socket.
Definition: Reactor.h:278
void cancel_timer(const DispatchHandlerPtr &handler)
Cancels timers associated with handler.
Definition: Reactor.h:145
Static class used to setup and manage I/O reactors.
void get_removed_handlers(std::set< IOHandler * > &dst)
Returns set of I/O handlers scheduled for removal.
Definition: Reactor.h:186
bool remove_request(uint32_t id, DispatchHandler *&handler)
Removes request associated with id
Definition: Reactor.h:115
Declaration of ClockT.
Socket descriptor poll state for use with POSIX poll()
Definition: Reactor.h:56
DispatchHandlerPtr handler
Dispatch handler to receive TIMER event.
Definition: ExpireTimer.h:44
static time_point now() noexcept
Definition: fast_clock.cc:37
Manages reactor (polling thread) state including poll interest, request cache, and timers...
Definition: Reactor.h:66
void fetch_poll_array(std::vector< struct pollfd > &fdarray, std::vector< IOHandler * > &handlers)
Fetches poll state vectors (POSIX poll only).
Definition: Reactor.cc:444
~Reactor()
Destructor.
Definition: Reactor.h:92
int interrupt_sd()
Returns interrupt socket.
Definition: Reactor.h:265
Base class for socket descriptor I/O handlers.
Definition: IOHandler.h:76
Declarations for RequestCache.
std::shared_ptr< Reactor > ReactorPtr
Shared smart pointer to Reactor.
Definition: Reactor.h:295
std::vector< PollDescriptorT > m_polldata
Vector of poll descriptor state structures for use with POSIX poll().
Definition: Reactor.h:285
void cancel_requests(IOHandler *handler, int32_t error=Error::COMM_BROKEN_CONNECTION)
Cancels outstanding requests associated with handler
Definition: Reactor.h:125
Hypertable definitions
void insert(uint32_t id, IOHandler *handler, DispatchHandler *dh, ClockT::time_point &expire)
Inserts pending request callback handler into cache.
Definition: RequestCache.cc:43
std::priority_queue< ExpireTimer, std::vector< ExpireTimer >, LtTimerHeap > TimerHeap
Priority queue for timers.
Definition: Reactor.h:272
std::mutex m_mutex
Mutex to protect members.
Definition: Reactor.h:274
Priority
Enumeration for reactor priority.
Definition: Reactor.h:73
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
ClockT::time_point m_next_wakeup
Next polling interface wait timeout (absolute)
Definition: Reactor.h:288
bool m_interrupt_in_progress
Set to true if poll loop interrupt in progress.
Definition: Reactor.h:281
std::mutex m_polldata_mutex
Mutex to protect m_polldata member.
Definition: Reactor.h:275
void add_request(uint32_t id, IOHandler *handler, DispatchHandler *dh, ClockT::time_point expire)
Adds a request to request cache and adjusts poll timeout if necessary.
Definition: Reactor.h:102