0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Reactor.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "IOHandlerData.h"
31 #include "Reactor.h"
32 #include "ReactorFactory.h"
33 #include "ReactorRunner.h"
34 
35 #include <Common/Error.h>
36 #include <Common/FileUtils.h>
37 #include <Common/Logger.h>
38 #include <Common/Time.h>
39 
40 #include <cassert>
41 #include <cstdio>
42 #include <iostream>
43 #include <set>
44 
45 extern "C" {
46 #include <arpa/inet.h>
47 #include <errno.h>
48 #include <netinet/in.h>
49 #include <stdio.h>
50 #include <stdlib.h>
51 #include <sys/socket.h>
52 #include <sys/time.h>
53 #include <sys/types.h>
54 #include <fcntl.h>
55 #if defined(__APPLE__) || defined(__FreeBSD__)
56 #include <sys/event.h>
57 #endif
58 }
59 
60 using namespace Hypertable;
61 using namespace std;
62 
64  struct sockaddr_in addr;
65 
67 #if defined(__linux__)
68  if ((poll_fd = epoll_create(256)) < 0) {
69  perror("epoll_create");
70  exit(EXIT_FAILURE);
71  }
72 #elif defined(__sun__)
73  if ((poll_fd = port_create()) < 0) {
74  perror("creation of event port failed");
75  exit(EXIT_FAILURE);
76  }
77 #elif defined(__APPLE__) || defined(__FreeBSD__)
78  kqd = kqueue();
79 #endif
80  }
81 
82  while (true) {
83 
89  if ((m_interrupt_sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
90  HT_ERRORF("socket() failure: %s", strerror(errno));
91  exit(EXIT_FAILURE);
92  }
93 
94  // Set to non-blocking (are we sure we should do this?)
95  FileUtils::set_flags(m_interrupt_sd, O_NONBLOCK);
96 
97  // create address structure to bind to - any available port - any address
98  memset(&addr, 0 , sizeof(sockaddr_in));
99  addr.sin_family = AF_INET;
100  addr.sin_addr.s_addr = inet_addr("127.0.0.1");
101  // Arbitray ephemeral port that won't conflict with our reserved ports
102  uint16_t port = (uint16_t)(49152 + std::uniform_int_distribution<>(0, 16382)(ReactorFactory::rng));
103  addr.sin_port = htons(port);
104 
105  // bind socket
106  if ((::bind(m_interrupt_sd, (sockaddr *)&addr, sizeof(sockaddr_in))) < 0) {
107  if (errno == EADDRINUSE) {
108  ::close(m_interrupt_sd);
109  continue;
110  }
111  HT_FATALF("bind(%s) failure: %s",
112  InetAddr::format(addr).c_str(), strerror(errno));
113  }
114  break;
115  }
116 
117  // Connect to ourself
118  // NOTE: Here we assume that any error returned by connect implies
119  // that it will be carried out asynchronously
120  if (connect(m_interrupt_sd, (sockaddr *)&addr, sizeof(addr)) < 0)
121  HT_INFOF("connect(interrupt_sd) to port %d failed - %s",
122  (int)ntohs(addr.sin_port), strerror(errno));
123 
125  lock_guard<mutex> lock(m_polldata_mutex);
126  if ((size_t)m_interrupt_sd >= m_polldata.size()) {
127  size_t i = m_polldata.size();
128  m_polldata.resize(m_interrupt_sd+1);
129  for (; i<m_polldata.size(); i++) {
130  m_polldata[i].pollfd.fd = -1;
131  m_polldata[i].pollfd.events = 0;
132  m_polldata[i].pollfd.revents = 0;
133  m_polldata[i].handler = 0;
134  }
135  }
136  m_polldata[m_interrupt_sd].pollfd.fd = m_interrupt_sd;
137  m_polldata[m_interrupt_sd].pollfd.events = POLLIN;
138  HT_ASSERT(poll_loop_interrupt() == Error::OK);
139  }
140  else {
141 #if defined(__linux__)
143 
144  struct epoll_event event;
145  memset(&event, 0, sizeof(struct epoll_event));
146  event.events = EPOLLIN | EPOLLOUT | POLLRDHUP | EPOLLET;
147  if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
148  HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, EPOLLIN|EPOLLOUT|POLLRDHUP|"
149  "EPOLLET) failed : %s", poll_fd, m_interrupt_sd,
150  strerror(errno));
151  exit(EXIT_FAILURE);
152  }
153  }
154  else {
155  struct epoll_event event;
156  memset(&event, 0, sizeof(struct epoll_event));
157  if (epoll_ctl(poll_fd, EPOLL_CTL_ADD, m_interrupt_sd, &event) < 0) {
158  HT_ERRORF("epoll_ctl(%d, EPOLL_CTL_ADD, %d, 0) failed : %s",
159  poll_fd, m_interrupt_sd, strerror(errno));
160  exit(EXIT_FAILURE);
161  }
162  }
163 #endif
164  }
165 
166  m_next_wakeup = ClockT::time_point();
167 }
168 
169 
171  vector<ExpireTimer> expired_timers;
172  EventPtr event;
173  ClockT::time_point now, next_req_timeout;
174  ExpireTimer timer;
175 
176  while(true) {
177  {
178  lock_guard<mutex> lock(m_mutex);
179  IOHandler *handler;
180  DispatchHandler *dh;
181 
182  now = ClockT::now();
183 
184  while (m_request_cache.get_next_timeout(now, handler, dh,
185  &next_req_timeout)) {
186  event = make_shared<Event>(Event::ERROR, ((IOHandlerData *)handler)->get_address(), Error::REQUEST_TIMEOUT);
187  event->set_proxy(((IOHandlerData *)handler)->get_proxy());
188  handler->deliver_event(event, dh);
189  }
190 
191  if (next_req_timeout != ClockT::time_point()) {
192  next_timeout.set(now, next_req_timeout);
193  m_next_wakeup = next_req_timeout;
194  }
195  else {
196  next_timeout.set_indefinite();
197  m_next_wakeup = ClockT::time_point();
198  }
199 
200  if (!m_timer_heap.empty()) {
201  ExpireTimer timer;
202 
203  while (!m_timer_heap.empty()) {
204  timer = m_timer_heap.top();
205  if (timer.expire_time > now) {
206  if (next_req_timeout == ClockT::time_point() ||
207  timer.expire_time < next_req_timeout) {
208  next_timeout.set(now, timer.expire_time);
209  m_next_wakeup = timer.expire_time;
210  }
211  break;
212  }
213  expired_timers.push_back(timer);
214  m_timer_heap.pop();
215  }
216 
217  }
218  }
219 
223  for (size_t i=0; i<expired_timers.size(); i++) {
224  event = make_shared<Event>(Event::TIMER, Error::OK);
225  if (expired_timers[i].handler)
226  expired_timers[i].handler->handle(event);
227  }
228 
229  {
230  lock_guard<mutex> lock(m_mutex);
231 
232  if (!m_timer_heap.empty()) {
233  timer = m_timer_heap.top();
234 
235  if (now > timer.expire_time)
236  continue;
237 
238  if (next_req_timeout == ClockT::time_point()
239  || timer.expire_time < next_req_timeout) {
240  next_timeout.set(now, timer.expire_time);
241  m_next_wakeup = timer.expire_time;
242  }
243  }
244 
245  poll_loop_continue();
246  }
247 
248  break;
249  }
250 
251 }
252 
253 
254 
256 
257  m_interrupt_in_progress = true;
258 
260  ssize_t n;
261 
262  // Send 1 byte to ourselves to cause epoll_wait to return
263  if ((n = FileUtils::send(m_interrupt_sd, "1", 1)) < 0) {
264  HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
265  return Error::COMM_SEND_ERROR;
266  }
267  return Error::OK;
268  }
269 
270 #if defined(__linux__)
271 
273 
274  char buf[4];
275  ssize_t n;
276 
277  // Send and receive 1 byte to ourself to cause epoll_wait to return
278 
279  if (FileUtils::send(m_interrupt_sd, "1", 1) < 0) {
280  HT_ERRORF("send(interrupt_sd) failed - %s", strerror(errno));
281  return Error::COMM_SEND_ERROR;
282  }
283 
284  if ((n = FileUtils::recv(m_interrupt_sd, buf, 1)) == -1) {
285  HT_ERRORF("recv(interrupt_sd) failed - %s", strerror(errno));
287  }
288 
289  }
290  else {
291 
292  struct epoll_event event;
293  memset(&event, 0, sizeof(struct epoll_event));
294  event.events = EPOLLOUT;
295  if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
301  return Error::COMM_POLL_ERROR;
302  }
303  }
304 
305 #elif defined(__sun__)
306 
307  if (port_alert(poll_fd, PORT_ALERT_SET, 1, NULL) < 0) {
308  HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 1, 0) failed - %s",
309  poll_fd, strerror(errno));
310  return Error::COMM_POLL_ERROR;
311  }
312 
313 #elif defined(__APPLE__) || defined(__FreeBSD__)
314  struct kevent event;
315 
316  EV_SET(&event, m_interrupt_sd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, 0);
317 
318  if (kevent(kqd, &event, 1, 0, 0, 0) == -1) {
319  HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
320  return Error::COMM_POLL_ERROR;
321  }
322 #endif
323  return Error::OK;
324 }
325 
326 
327 
332 
333  if (!m_interrupt_in_progress || ReactorFactory::use_poll) {
334  m_interrupt_in_progress = false;
335  return Error::OK;
336  }
337 
338 #if defined(__linux__)
339 
341  struct epoll_event event;
342  char buf[8];
343 
344  // Receive message(s) we sent to ourself in poll_loop_interrupt()
345  while (FileUtils::recv(m_interrupt_sd, buf, 8) > 0)
346  ;
347 
348  memset(&event, 0, sizeof(struct epoll_event));
349  event.events = EPOLLERR | EPOLLHUP;
350 
351  if (epoll_ctl(poll_fd, EPOLL_CTL_MOD, m_interrupt_sd, &event) < 0) {
352  HT_ERRORF("epoll_ctl(EPOLL_CTL_MOD, sd=%d) : %s", m_interrupt_sd,
353  strerror(errno));
354  return Error::COMM_POLL_ERROR;
355  }
356  }
357 
358 #elif defined(__sun__)
359 
360  if (port_alert(poll_fd, PORT_ALERT_SET, 0, NULL) < 0) {
361  HT_ERRORF("port_alert(%d, PORT_ALERT_SET, 0, 0) failed - %s",
362  poll_fd, strerror(errno));
363  return Error::COMM_POLL_ERROR;
364  }
365 
366 #elif defined(__APPLE__) || defined(__FreeBSD__)
367  struct kevent devent;
368 
369  EV_SET(&devent, m_interrupt_sd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
370 
371  if (kevent(kqd, &devent, 1, 0, 0, 0) == -1 && errno != ENOENT) {
372  HT_ERRORF("kevent(sd=%d) : %s", m_interrupt_sd, strerror(errno));
373  return Error::COMM_POLL_ERROR;
374  }
375 #else
376  ImplementMe();
377 #endif
378  m_interrupt_in_progress = false;
379  return Error::OK;
380 }
381 
382 
383 int Reactor::add_poll_interest(int sd, short events, IOHandler *handler) {
384  lock_guard<mutex> lock(m_polldata_mutex);
385  int error;
386 
387  if (m_polldata.size() <= (size_t)sd) {
388  size_t i = m_polldata.size();
389  m_polldata.resize(sd+1);
390  for (; i<m_polldata.size(); i++) {
391  memset(&m_polldata[i], 0, sizeof(PollDescriptorT));
392  m_polldata[i].pollfd.fd = -1;
393  }
394  }
395 
396  m_polldata[sd].pollfd.fd = sd;
397  m_polldata[sd].pollfd.events = events;
398  m_polldata[sd].handler = handler;
399 
400  {
401  lock_guard<mutex> lock(m_mutex);
402  error = poll_loop_interrupt();
403  }
404  if (error != Error::OK) {
405  m_polldata[sd].pollfd.fd = -1;
406  m_polldata[sd].pollfd.events = 0;
407  m_polldata[sd].handler = 0;
408  }
409  return error;
410 }
411 
413  {
414  lock_guard<mutex> lock(m_polldata_mutex);
415 
416  HT_ASSERT(m_polldata.size() > (size_t)sd);
417  if ((size_t)sd == m_polldata.size()-1) {
418  int last_entry = sd;
419  do {
420  last_entry--;
421  } while (last_entry > 0 && m_polldata[last_entry].pollfd.fd == -1);
422  m_polldata.resize(last_entry+1);
423  }
424  else {
425  m_polldata[sd].pollfd.fd = -1;
426  m_polldata[sd].handler = 0;
427  }
428  }
429  lock_guard<mutex> lock(m_mutex);
430  return poll_loop_interrupt();
431 }
432 
433 int Reactor::modify_poll_interest(int sd, short events) {
434  {
435  lock_guard<mutex> lock(m_polldata_mutex);
436  HT_ASSERT(m_polldata.size() > (size_t)sd);
437  m_polldata[sd].pollfd.events = events;
438  }
439  lock_guard<mutex> lock(m_mutex);
440  return poll_loop_interrupt();
441 }
442 
443 
444 void Reactor::fetch_poll_array(std::vector<struct pollfd> &fdarray,
445  std::vector<IOHandler *> &handlers) {
446  lock_guard<mutex> lock(m_polldata_mutex);
447 
448  fdarray.clear();
449  handlers.clear();
450 
451  for (size_t i=0; i<m_polldata.size(); i++) {
452  if (m_polldata[i].pollfd.fd != -1 && m_polldata[i].pollfd.events) {
453  fdarray.push_back(m_polldata[i].pollfd);
454  handlers.push_back(m_polldata[i].handler);
455  }
456  }
457 }
Reactor()
Constructor.
Definition: Reactor.cc:63
void handle_timeouts(PollTimeout &next_timeout)
Processes request timeouts and timers.
Definition: Reactor.cc:170
int remove_poll_interest(int sd)
Remove poll interest for socket (POSIX poll only).
Definition: Reactor.cc:412
int modify_poll_interest(int sd, short events)
Modify poll interest for socket (POSIX poll only).
Definition: Reactor.cc:433
State record for timer.
Definition: ExpireTimer.h:42
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
static bool ms_epollet
Use "edge triggered" epoll.
Maintains next timeout for event polling loop.
Definition: PollTimeout.h:44
int poll_loop_continue()
Reset state after call to poll_loop_interrupt.
Definition: Reactor.cc:331
static std::default_random_engine rng
Pseudo random number generator.
int poll_loop_interrupt()
Forces polling interface wait call to return.
Definition: Reactor.cc:255
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
int add_poll_interest(int sd, short events, IOHandler *handler)
Add poll interest for socket (POSIX poll only).
Definition: Reactor.cc:383
STL namespace.
ClockT::time_point expire_time
Absolute expiration time.
Definition: ExpireTimer.h:43
Error event
Definition: Event.h:64
static ssize_t send(int fd, const void *vptr, size_t n)
Sends data through a network connection.
Definition: FileUtils.cc:203
void set(ClockT::time_point now, ClockT::time_point expire)
Sets the next timeout.
Definition: PollTimeout.h:55
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Socket descriptor poll state for use with POSIX poll()
Definition: Reactor.h:56
ImplementMe
Definition: IOHandler.cc:440
File system utility functions.
static time_point now() noexcept
Definition: fast_clock.cc:37
void fetch_poll_array(std::vector< struct pollfd > &fdarray, std::vector< IOHandler * > &handlers)
Fetches poll state vectors (POSIX poll only).
Definition: Reactor.cc:444
Logging routines and macros.
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
Definition: IOHandler.h:76
I/O handler for TCP sockets.
Definition: IOHandlerData.h:51
Time related declarations.
Declarations for IOHandlerData.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
static bool set_flags(int fd, int flags)
Sets fcntl flags of a socket.
Definition: FileUtils.cc:256
Declarations for Reactor.
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Timer event
Definition: Event.h:65
Declarations for ReactorFactory.
void set_indefinite()
Sets the next timeout to be an indefinite time in the future.
Definition: PollTimeout.h:66
void deliver_event(EventPtr &event, DispatchHandler *dh=0)
Convenience method for delivering event to application.
Definition: IOHandler.h:169
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
static ssize_t recv(int fd, void *vptr, size_t n)
Receives data from a network connection.
Definition: FileUtils.cc:242
Error codes, Exception handling, error logging.