0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IOHandlerDatagram.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #define HT_DISABLE_LOG_DEBUG 1
31 
32 #include "IOHandlerDatagram.h"
33 #include "ReactorRunner.h"
34 
35 #include <Common/Error.h>
36 #include <Common/FileUtils.h>
37 
38 #include <cassert>
39 #include <iostream>
40 
41 extern "C" {
42 #include <arpa/inet.h>
43 #include <errno.h>
44 #include <netinet/in.h>
45 #include <sys/socket.h>
46 #include <sys/types.h>
47 #if defined(__APPLE__) || defined(__FreeBSD__)
48 #include <sys/event.h>
49 #endif
50 }
51 
52 using namespace Hypertable;
53 using namespace std;
54 
55 bool
56 IOHandlerDatagram::handle_event(struct pollfd *event,
57  ClockT::time_point arrival_time) {
58  int error;
59 
60  //DisplayEvent(event);
61 
62  if (event->revents & POLLOUT) {
63  if ((error = handle_write_readiness()) != Error::OK) {
64  test_and_set_error(error);
65  ReactorRunner::handler_map->decomission_handler(this);
66  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, error);
67  deliver_event(event_ptr);
68  return true;
69  }
70  }
71 
72  if (event->revents & POLLIN) {
73  ssize_t nread, payload_len;
74  struct sockaddr_in addr;
75  socklen_t fromlen = sizeof(struct sockaddr_in);
76 
77  while ((nread = FileUtils::recvfrom(m_sd, m_message, 65536,
78  (struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
79 
80  EventPtr event_ptr = make_shared<Event>(Event::MESSAGE, addr, Error::OK);
81  event_ptr->load_message_header(m_message, (size_t)m_message[1]);
82 
83  payload_len = nread - (ssize_t)event_ptr->header.header_len;
84  event_ptr->payload_len = payload_len;
85  event_ptr->payload = new uint8_t [payload_len];
86  event_ptr->arrival_time = arrival_time;
87  memcpy((void *)event_ptr->payload, m_message + event_ptr->header.header_len,
88  payload_len);
89  deliver_event( event_ptr );
90  fromlen = sizeof(struct sockaddr_in);
91  }
92 
93  if (errno != EAGAIN) {
94  HT_ERRORF("FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
95  EventPtr event_ptr = make_shared<Event>(Event::ERROR, addr,
97  deliver_event(event_ptr);
98  ReactorRunner::handler_map->decomission_handler(this);
99  }
100 
101  return false;
102  }
103 
104  if (event->events & POLLERR) {
105  HT_WARN_OUT << "Received EPOLLERR on descriptor " << m_sd << " ("
106  << m_addr.format() << ")" << HT_END;
107  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, Error::COMM_POLL_ERROR);
108  deliver_event(event_ptr);
109  ReactorRunner::handler_map->decomission_handler(this);
110  return true;
111  }
112 
113  HT_ASSERT((event->revents & POLLNVAL) == 0);
114 
115  return false;
116 }
117 
118 #if defined(__linux__)
119 
120 bool IOHandlerDatagram::handle_event(struct epoll_event *event,
121  ClockT::time_point arrival_time) {
122  int error;
123 
124  //DisplayEvent(event);
125 
126  if (event->events & EPOLLOUT) {
127  if ((error = handle_write_readiness()) != Error::OK) {
128  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, error);
129  deliver_event(event_ptr);
130  ReactorRunner::handler_map->decomission_handler(this);
131  return true;
132  }
133  }
134 
135  if (event->events & EPOLLIN) {
136  ssize_t nread, payload_len;
137  InetAddr addr;
138  socklen_t fromlen = sizeof(struct sockaddr_in);
139 
140  while ((nread = FileUtils::recvfrom(m_sd, m_message, 65536,
141  (struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
142 
143  EventPtr event_ptr = make_shared<Event>(Event::MESSAGE, addr, Error::OK);
144 
145  try {
146  event_ptr->load_message_header(m_message, (size_t)m_message[1]);
147  }
148  catch (Hypertable::Exception &e) {
149  HT_ERROR_OUT << e << " - from " << addr.format() << HT_END;
150  continue;
151  }
152 
153  payload_len = nread - (ssize_t)event_ptr->header.header_len;
154  event_ptr->payload_len = payload_len;
155  event_ptr->payload = new uint8_t [payload_len];
156  event_ptr->arrival_time = arrival_time;
157  memcpy((void *)event_ptr->payload, m_message + event_ptr->header.header_len,
158  payload_len);
159  deliver_event( event_ptr );
160  fromlen = sizeof(struct sockaddr_in);
161  }
162 
163  if (errno != EAGAIN) {
164  HT_ERRORF("FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
165  EventPtr event_ptr = make_shared<Event>(Event::ERROR, addr,
167  deliver_event(event_ptr);
168  ReactorRunner::handler_map->decomission_handler(this);
169  return true;
170  }
171 
172  return false;
173  }
174 
175  if (event->events & EPOLLERR) {
176  HT_WARN_OUT << "Received EPOLLERR on descriptor " << m_sd << " ("
177  << m_addr.format() << ")" << HT_END;
178  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, Error::COMM_POLL_ERROR);
179  deliver_event(event_ptr);
180  ReactorRunner::handler_map->decomission_handler(this);
181  return true;
182  }
183 
184  return false;
185 }
186 
187 #elif defined(__sun__)
188 bool
189 IOHandlerDatagram::handle_event(port_event_t *event,
190  ClockT::time_point arrival_time) {
191  int error;
192 
193  //DisplayEvent(event);
194 
195  try {
196 
197  if (event->portev_events == POLLOUT) {
198  if ((error = handle_write_readiness()) != Error::OK) {
199  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, error);
200  deliver_event(event_ptr);
201  ReactorRunner::handler_map->decomission_handler(this);
202  return true;
203  }
204  }
205 
206  if (event->portev_events == POLLIN) {
207  ssize_t nread, payload_len;
208  struct sockaddr_in addr;
209  socklen_t fromlen = sizeof(struct sockaddr_in);
210 
211  while ((nread = FileUtils::recvfrom(m_sd, m_message, 65536,
212  (struct sockaddr *)&addr, &fromlen)) != (ssize_t)-1) {
213 
214  EventPtr event_ptr = make_shared<Event>(Event::MESSAGE, addr, Error::OK);
215 
216  event_ptr->load_message_header(m_message, (size_t)m_message[1]);
217 
218  payload_len = nread - (ssize_t)event_ptr->header.header_len;
219  event_ptr->payload_len = payload_len;
220  event_ptr->payload = new uint8_t [payload_len];
221  event_ptr->arrival_time = arrival_time;
222  memcpy((void *)event_ptr->payload, m_message + event_ptr->header.header_len,
223  payload_len);
224  deliver_event(event_ptr);
225  fromlen = sizeof(struct sockaddr_in);
226  }
227 
228  if (errno != EAGAIN) {
229  HT_ERRORF("FileUtils::recvfrom(%d) failure : %s", m_sd, strerror(errno));
230  EventPtr event_ptr = make_shared<Event>(Event::ERROR, addr,
232  deliver_event(event_ptr);
233  ReactorRunner::handler_map->decomission_handler(this);
234  return true;
235  }
236 
237  return false;
238  }
239 
240  if (event->portev_events == POLLERR) {
241  HT_WARN_OUT << "Received EPOLLERR on descriptor " << m_sd << " ("
242  << m_addr.format() << ")" << HT_END;
243  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, Error::COMM_POLL_ERROR);
244  deliver_event(event_ptr);
245  ReactorRunner::handler_map->decomission_handler(this);
246  return true;
247  }
248 
249  if (event->portev_events == POLLREMOVE) {
250  HT_DEBUGF("Received POLLREMOVE on descriptor %d (%s:%d)", m_sd,
251  inet_ntoa(m_addr.sin_addr), ntohs(m_addr.sin_port));
252  ReactorRunner::handler_map->decomission_handler(this);
253  return true;
254  }
255 
256  }
257  catch (Hypertable::Exception &e) {
258  HT_ERROR_OUT << e << HT_END;
259  ReactorRunner::handler_map->decomission_handler(this);
260  return true;
261  }
262 
263  return false;
264 
265 }
266 
267 #elif defined(__APPLE__) || defined(__FreeBSD__)
268 
269 bool
270 IOHandlerDatagram::handle_event(struct kevent *event,
271  ClockT::time_point arrival_time) {
272  int error;
273 
274  //DisplayEvent(event);
275 
276  assert(m_sd == (int)event->ident);
277 
278  assert((event->flags & EV_EOF) == 0);
279 
280  if (event->filter == EVFILT_WRITE) {
281  if ((error = handle_write_readiness()) != Error::OK) {
282  EventPtr event_ptr = make_shared<Event>(Event::ERROR, m_addr, error);
283  deliver_event(event_ptr);
284  ReactorRunner::handler_map->decomission_handler(this);
285  return true;
286  }
287  }
288 
289  if (event->filter == EVFILT_READ) {
290  size_t available = (size_t)event->data;
291  ssize_t nread, payload_len;
292  struct sockaddr_in addr;
293  socklen_t fromlen = sizeof(struct sockaddr_in);
294 
295  if ((nread = FileUtils::recvfrom(m_sd, m_message, 65536,
296  (struct sockaddr *)&addr, &fromlen)) == (ssize_t)-1) {
297  HT_ERRORF("FileUtils::recvfrom(%d, len=%d) failure : %s", m_sd,
298  (int)available, strerror(errno));
299  EventPtr event_ptr = make_shared<Event>(Event::ERROR, addr,
301  deliver_event(event_ptr);
302  ReactorRunner::handler_map->decomission_handler(this);
303  return true;
304  }
305 
306  EventPtr event_ptr = make_shared<Event>(Event::MESSAGE, addr, Error::OK);
307 
308  event_ptr->load_message_header(m_message, (size_t)m_message[1]);
309 
310  payload_len = nread - (ssize_t)event_ptr->header.header_len;
311  event_ptr->payload_len = payload_len;
312  event_ptr->payload = new uint8_t [payload_len];
313  event_ptr->arrival_time = arrival_time;
314  memcpy((void *)event_ptr->payload, m_message + event_ptr->header.header_len,
315  payload_len);
316  deliver_event(event_ptr);
317  }
318 
319  return false;
320 
321 }
322 #else
324 #endif
325 
326 
328  lock_guard<mutex> lock(m_mutex);
329  int error;
330 
331  if ((error = flush_send_queue()) != Error::OK) {
332  if (m_error == Error::OK)
333  m_error = error;
334  return error;
335  }
336 
337  // is this necessary?
338  if (m_send_queue.empty())
339  error = remove_poll_interest(PollEvent::WRITE);
340 
341  if (error != Error::OK && m_error == Error::OK)
342  m_error = error;
343 
344  return error;
345 }
346 
347 
348 
350  lock_guard<mutex> lock(m_mutex);
351  int error = Error::OK;
352  bool initially_empty = m_send_queue.empty() ? true : false;
353 
354  HT_LOG_ENTER;
355 
356  //HT_INFOF("Pushing message destined for %s:%d onto send queue",
357  //inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
358 
359  m_send_queue.push_back(SendRec(addr, cbp));
360 
361  if ((error = flush_send_queue()) != Error::OK) {
362  if (m_error == Error::OK)
363  m_error = error;
364  return error;
365  }
366 
367  if (initially_empty && !m_send_queue.empty()) {
368  error = add_poll_interest(PollEvent::WRITE);
369  //HT_INFO("Adding Write interest");
370  }
371  else if (!initially_empty && m_send_queue.empty()) {
372  error = remove_poll_interest(PollEvent::WRITE);
373  //HT_INFO("Removing Write interest");
374  }
375 
376  if (error != Error::OK && m_error == Error::OK)
377  m_error = error;
378 
379  return error;
380 }
381 
382 
384  ssize_t nsent, tosend;
385 
386  while (!m_send_queue.empty()) {
387 
388  SendRec &send_rec = m_send_queue.front();
389 
390  tosend = send_rec.second->data.size - (send_rec.second->data_ptr
391  - send_rec.second->data.base);
392  assert(tosend > 0);
393  assert(send_rec.second->ext.base == 0);
394 
395  nsent = FileUtils::sendto(m_sd, send_rec.second->data_ptr, tosend,
396  (sockaddr *)&send_rec.first,
397  sizeof(struct sockaddr_in));
398 
399  if (nsent == (ssize_t)-1) {
400  HT_WARNF("FileUtils::sendto(%d, len=%d, addr=%s:%d) failed : %s", m_sd,
401  (int)tosend, inet_ntoa(send_rec.first.sin_addr),
402  ntohs(send_rec.first.sin_port), strerror(errno));
403  return Error::COMM_SEND_ERROR;
404  }
405  else if (nsent < tosend) {
406  HT_WARNF("Only sent %d bytes", (int)nsent);
407  if (nsent == 0)
408  break;
409  send_rec.second->data_ptr += nsent;
410  break;
411  }
412 
413  // buffer written successfully, now remove from queue (destroys buffer)
414  m_send_queue.pop_front();
415  }
416 
417  return Error::OK;
418 }
#define HT_WARNF(msg,...)
Definition: Logger.h:290
int handle_write_readiness()
Handles write readiness.
int send_message(const InetAddr &addr, CommBufPtr &cbp)
Sends a message.
#define HT_LOG_ENTER
Definition: Logger.h:241
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Declarations for ReactorRunner.
Declarations for IOHandlerDatagram.
int flush_send_queue()
Flushes send queue.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
static ssize_t recvfrom(int fd, void *vptr, size_t n, struct sockaddr *from, socklen_t *fromlen)
Receives data from a network connection and returns the sender's address.
Definition: FileUtils.cc:227
Error event
Definition: Event.h:64
static ssize_t sendto(int fd, const void *vptr, size_t n, const sockaddr *to, socklen_t tolen)
Sends data through a network connection; if the socket is TCP then the address is ignored...
Definition: FileUtils.cc:178
#define HT_ASSERT(_e_)
Definition: Logger.h:396
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
Definition: ReactorRunner.h:70
File system utility functions.
Encapsulate an internet address.
Definition: InetAddr.h:66
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
#define HT_ERROR_OUT
Definition: Logger.h:301
#define HT_WARN_OUT
Definition: Logger.h:291
Hypertable definitions
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
Writing can be performed without blocking.
Definition: PollEvent.h:46
Request/response message event.
Definition: Event.h:63
This is a generic exception class for Hypertable.
Definition: Error.h:314
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
std::pair< struct sockaddr_in, CommBufPtr > SendRec
Send queue message record.
bool handle_event(struct pollfd *event, ClockT::time_point arrival_time) override
Handle poll() interface events.
Error codes, Exception handling, error logging.