0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ReactorRunner.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #define HT_DISABLE_LOG_DEBUG 1
31 
32 #include "Clock.h"
33 #include "HandlerMap.h"
34 #include "IOHandler.h"
35 #include "IOHandlerData.h"
36 #include "ReactorFactory.h"
37 #include "ReactorRunner.h"
38 
39 #include <Common/Config.h>
40 #include <Common/FileUtils.h>
41 #include <Common/Logger.h>
42 #include <Common/Time.h>
43 
44 #include <chrono>
45 #include <thread>
46 
47 extern "C" {
48 #include <errno.h>
49 #include <poll.h>
50 #include <stdlib.h>
51 #include <stdio.h>
52 #include <sys/types.h>
53 #include <sys/time.h>
54 #if defined(__APPLE__) || defined(__FreeBSD__)
55 #include <sys/event.h>
56 #endif
57 }
58 
59 using namespace Hypertable;
60 using namespace std;
61 
65 
66 
68  int n;
69  IOHandler *handler;
70  std::set<IOHandler *> removed_handlers;
71  PollTimeout timeout;
72  bool did_delay = false;
73  ClockT::time_point arrival_time;
74  bool got_arrival_time = false;
75  std::vector<struct pollfd> pollfds;
76  std::vector<IOHandler *> handlers;
77 
79 
80  uint32_t dispatch_delay {};
81 
82  if (Config::properties->has("Comm.DispatchDelay"))
83  dispatch_delay = Config::properties->get_i32("Comm.DispatchDelay");
84 
86 
87  m_reactor->fetch_poll_array(pollfds, handlers);
88 
89  while ((n = poll(&pollfds[0], pollfds.size(),
90  timeout.get_millis())) >= 0 || errno == EINTR) {
91 
92  if (record_arrival_time)
93  got_arrival_time = false;
94 
95  if (dispatch_delay)
96  did_delay = false;
97 
98  m_reactor->get_removed_handlers(removed_handlers);
99  if (!shutdown)
100  HT_DEBUGF("poll returned %d events", n);
101  for (size_t i=0; i<pollfds.size(); i++) {
102 
103  if (pollfds[i].revents == 0)
104  continue;
105 
106  if (pollfds[i].fd == m_reactor->interrupt_sd()) {
107  char buf[8];
108  int nread;
109  errno = 0;
110  if ((nread = FileUtils::recv(pollfds[i].fd, buf, 8)) == -1 &&
111  errno != EAGAIN && errno != EINTR) {
112  HT_ERRORF("recv(interrupt_sd) failed - %s", strerror(errno));
113  exit(EXIT_FAILURE);
114  }
115  }
116 
117  if (handlers[i] && removed_handlers.count(handlers[i]) == 0) {
118  // dispatch delay for testing
119  if (dispatch_delay && !did_delay && (pollfds[i].revents & POLLIN)) {
120  this_thread::sleep_for(chrono::milliseconds((int)dispatch_delay));
121  did_delay = true;
122  }
123  if (record_arrival_time && !got_arrival_time
124  && (pollfds[i].revents & POLLIN)) {
125  arrival_time = ClockT::now();
126  got_arrival_time = true;
127  }
128  if (handlers[i]->handle_event(&pollfds[i], arrival_time))
129  removed_handlers.insert(handlers[i]);
130  }
131  }
132  if (!removed_handlers.empty())
133  cleanup_and_remove_handlers(removed_handlers);
134  m_reactor->handle_timeouts(timeout);
135  if (shutdown)
136  return;
137 
138  m_reactor->fetch_poll_array(pollfds, handlers);
139  }
140 
141  if (!shutdown)
142  HT_ERRORF("poll() failed : %s", strerror(errno));
143 
144  return;
145  }
146 
147 #if defined(__linux__)
148  struct epoll_event events[256];
149 
150  while ((n = epoll_wait(m_reactor->poll_fd, events, 256,
151  timeout.get_millis())) >= 0 || errno == EINTR) {
152 
153  if (record_arrival_time)
154  got_arrival_time = false;
155 
156  if (dispatch_delay)
157  did_delay = false;
158 
159  m_reactor->get_removed_handlers(removed_handlers);
160 
161  if (!shutdown)
162  HT_DEBUGF("epoll_wait returned %d events", n);
163  for (int i=0; i<n; i++) {
164  handler = (IOHandler *)events[i].data.ptr;
165  if (handler && removed_handlers.count(handler) == 0) {
166  // dispatch delay for testing
167  if (dispatch_delay && !did_delay && (events[i].events & EPOLLIN)) {
168  this_thread::sleep_for(chrono::milliseconds((int)dispatch_delay));
169  did_delay = true;
170  }
171  if (record_arrival_time && !got_arrival_time
172  && (events[i].events & EPOLLIN)) {
173  arrival_time = ClockT::now();
174  got_arrival_time = true;
175  }
176  if (handler->handle_event(&events[i], arrival_time))
177  removed_handlers.insert(handler);
178  }
179  }
180  if (!removed_handlers.empty())
181  cleanup_and_remove_handlers(removed_handlers);
182  m_reactor->handle_timeouts(timeout);
183  if (shutdown)
184  return;
185  }
186 
187  if (!shutdown)
188  HT_ERRORF("epoll_wait(%d) failed : %s", m_reactor->poll_fd,
189  strerror(errno));
190 
191 #elif defined(__sun__)
192 
193  int ret;
194  unsigned nget = 1;
195  port_event_t *events;
196 
197  (void)n;
198 
199  events = (port_event_t *)calloc(33, sizeof (port_event_t));
200 
201  while ((ret = port_getn(m_reactor->poll_fd, events, 32,
202  &nget, timeout.get_timespec())) >= 0 ||
203  errno == EINTR || errno == EAGAIN || errno == ETIME) {
204 
205  //HT_INFOF("port_getn returned with %d", nget);
206 
207  if (record_arrival_time)
208  got_arrival_time = false;
209 
210  if (dispatch_delay)
211  did_delay = false;
212 
213  m_reactor->get_removed_handlers(removed_handlers);
214  for (unsigned i=0; i<nget; i++) {
215 
216  // handle interrupt
217  if (events[i].portev_source == PORT_SOURCE_ALERT)
218  break;
219 
220  handler = (IOHandler *)events[i].portev_user;
221  if (handler && removed_handlers.count(handler) == 0) {
222  // dispatch delay for testing
223  if (dispatch_delay && !did_delay && events[i].portev_events == POLLIN) {
224  this_thread::sleep_for(chrono::milliseconds((int)dispatch_delay));
225  did_delay = true;
226  }
227  if (record_arrival_time && !got_arrival_time && events[i].portev_events == POLLIN) {
228  arrival_time = ClockT::now();
229  got_arrival_time = true;
230  }
231  if (handler->handle_event(&events[i], arrival_time))
232  removed_handlers.insert(handler);
233  else if (removed_handlers.count(handler) == 0)
234  handler->reset_poll_interest();
235  }
236  }
237  if (!removed_handlers.empty())
238  cleanup_and_remove_handlers(removed_handlers);
239  m_reactor->handle_timeouts(timeout);
240  if (shutdown)
241  return;
242  nget=1;
243  }
244 
245  if (!shutdown) {
246  HT_ERRORF("port_getn(%d) failed : %s", m_reactor->poll_fd,
247  strerror(errno));
248  if (timeout.get_timespec() == 0)
249  HT_ERROR("timespec is null");
250 
251  }
252 
253 #elif defined(__APPLE__) || defined(__FreeBSD__)
254  struct kevent events[32];
255 
256  while ((n = kevent(m_reactor->kqd, NULL, 0, events, 32,
257  timeout.get_timespec())) >= 0 || errno == EINTR) {
258 
259  if (record_arrival_time)
260  got_arrival_time = false;
261 
262  if (dispatch_delay)
263  did_delay = false;
264 
265  m_reactor->get_removed_handlers(removed_handlers);
266  for (int i=0; i<n; i++) {
267  handler = (IOHandler *)events[i].udata;
268  if (handler && removed_handlers.count(handler) == 0) {
269  // dispatch delay for testing
270  if (dispatch_delay && !did_delay && events[i].filter == EVFILT_READ) {
271  this_thread::sleep_for(chrono::milliseconds((int)dispatch_delay));
272  did_delay = true;
273  }
274  if (record_arrival_time && !got_arrival_time && events[i].filter == EVFILT_READ) {
275  arrival_time = ClockT::now();
276  got_arrival_time = true;
277  }
278  if (handler->handle_event(&events[i], arrival_time))
279  removed_handlers.insert(handler);
280  }
281  }
282  if (!removed_handlers.empty())
283  cleanup_and_remove_handlers(removed_handlers);
284  m_reactor->handle_timeouts(timeout);
285  if (shutdown)
286  return;
287  }
288 
289  if (!shutdown)
290  HT_ERRORF("kevent(%d) failed : %s", m_reactor->kqd, strerror(errno));
291 
292 #else
293  ImplementMe;
294 #endif
295 }
296 
297 
298 
299 void
300 ReactorRunner::cleanup_and_remove_handlers(std::set<IOHandler *> &handlers) {
301 
302  for (auto handler : handlers) {
303 
304  HT_ASSERT(handler);
305 
306  if (!handler_map->destroy_ok(handler))
307  continue;
308 
309  m_reactor->cancel_requests(handler);
310 
312  m_reactor->remove_poll_interest(handler->get_sd());
313  else {
314 #if defined(__linux__)
315  struct epoll_event event;
316  memset(&event, 0, sizeof(struct epoll_event));
317  if (epoll_ctl(m_reactor->poll_fd, EPOLL_CTL_DEL, handler->get_sd(), &event) < 0) {
318  if (!shutdown)
319  HT_ERRORF("epoll_ctl(EPOLL_CTL_DEL, %d) failure, %s", handler->get_sd(),
320  strerror(errno));
321  }
322 #elif defined(__APPLE__) || defined(__FreeBSD__)
323  struct kevent devents[2];
324  EV_SET(&devents[0], handler->get_sd(), EVFILT_READ, EV_DELETE, 0, 0, 0);
325  EV_SET(&devents[1], handler->get_sd(), EVFILT_WRITE, EV_DELETE, 0, 0, 0);
326  if (kevent(m_reactor->kqd, devents, 2, NULL, 0, NULL) == -1
327  && errno != ENOENT) {
328  if (!shutdown)
329  HT_ERRORF("kevent(%d) : %s", handler->get_sd(), strerror(errno));
330  }
331 #elif !defined(__sun__)
332  ImplementMe;
333 #endif
334  }
335  handler_map->purge_handler(handler);
336  }
337 }
int reset_poll_interest()
Resets poll interest by adding m_poll_interest to the polling interface for this handler.
Definition: IOHandler.h:220
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Declarations for ReactorRunner.
void operator()()
Primary thread entry point.
Maintains next timeout for event polling loop.
Definition: PollTimeout.h:44
STL namespace.
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void cleanup_and_remove_handlers(std::set< IOHandler * > &handlers)
Cleans up and removes a set of handlers.
static bool record_arrival_time
If set to true arrival time is recorded and passed into IOHandler::handle.
Definition: ReactorRunner.h:67
Declaration of ClockT.
int get_millis()
Gets duration until next timeout in the form of milliseconds.
Definition: PollTimeout.h:74
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
Definition: ReactorRunner.h:70
ImplementMe
Definition: IOHandler.cc:440
File system utility functions.
std::shared_ptr< HandlerMap > HandlerMapPtr
Smart pointer to HandlerMap.
Definition: HandlerMap.h:437
static time_point now() noexcept
Definition: fast_clock.cc:37
Logging routines and macros.
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
Definition: IOHandler.h:76
Declarations for HandlerMap.
Time related declarations.
Declarations for IOHandlerData.
Hypertable definitions
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
static bool shutdown
Flag indicating that reactor thread is being shut down.
Definition: ReactorRunner.h:63
#define HT_ERROR(msg)
Definition: Logger.h:299
virtual bool handle_event(struct pollfd *event, ClockT::time_point arrival_time)=0
Event handler method for Unix poll interface.
Declarations for ReactorFactory.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Configuration settings.
static ssize_t recv(int fd, void *vptr, size_t n)
Receives data from a network connection.
Definition: FileUtils.cc:242
struct timespec * get_timespec()
Gets duration until next timeout in the form of a pointer to timespec.
Definition: PollTimeout.h:79
Declarations for IOHandler.