0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Comm.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 //#define HT_DISABLE_LOG_DEBUG
29 
30 #include <Common/Compat.h>
31 
32 #include "ReactorFactory.h"
33 #include "ReactorRunner.h"
34 #include "Comm.h"
35 #include "IOHandlerAccept.h"
36 #include "IOHandlerData.h"
37 
38 #include <Common/Config.h>
39 #include <Common/Error.h>
40 #include <Common/InetAddr.h>
41 #include <Common/FileUtils.h>
42 #include <Common/ScopeGuard.h>
43 #include <Common/SystemInfo.h>
44 #include <Common/Time.h>
45 
46 #include <cassert>
47 #include <chrono>
48 #include <iostream>
49 #include <thread>
50 
51 extern "C" {
52 #if defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
53 #include <arpa/inet.h>
54 #include <netinet/ip.h>
55 #endif
56 #include <errno.h>
57 #include <fcntl.h>
58 #include <netdb.h>
59 #include <netinet/in.h>
60 #include <netinet/tcp.h>
61 #include <sys/socket.h>
62 #include <sys/types.h>
63 #include <arpa/inet.h>
64 }
65 
66 using namespace Hypertable;
67 using namespace std;
68 
69 atomic<uint32_t> Comm::ms_next_request_id(1);
70 
71 Comm *Comm::ms_instance = NULL;
73 
75  if (ReactorFactory::ms_reactors.size() == 0) {
76  HT_ERROR("ReactorFactory::initialize must be called before creating "
77  "AsyncComm::comm object");
78  HT_ABORT;
79  }
80 
81  m_verbose = Config::properties->has("verbose") && Config::properties->get_bool("verbose");
82  ReactorFactory::verbose = m_verbose;
83 
84  InetAddr::initialize(&m_local_addr, System::net_info().primary_addr.c_str(), 0);
85 
86  ReactorFactory::get_timer_reactor(m_timer_reactor);
87  m_handler_map = ReactorRunner::handler_map;
88 }
89 
90 
92  m_handler_map->decomission_all();
93 
94  // wait for all decomissioned handlers to get purged by Reactor
95  m_handler_map->wait_for_empty();
96 
97  // Since Comm is a singleton, this is OK
99 }
100 
101 
103  if (ms_instance) {
104  delete ms_instance;
105  ms_instance = 0;
106  }
107 }
108 
109 int Comm::register_socket(int sd, const CommAddress &addr,
110  RawSocketHandler *handler) {
111  IOHandlerRaw *io_handler;
112 
113  if (m_handler_map->checkout_handler(addr, &io_handler) == Error::OK) {
114  m_handler_map->decrement_reference_count(io_handler);
115  return Error::ALREADY_EXISTS;
116  }
117 
118  HT_ASSERT(addr.is_inet());
119  io_handler = new IOHandlerRaw(sd, addr.inet, handler);
120  m_handler_map->insert_handler(io_handler);
121 
122  int32_t error;
123  if ((error = io_handler->start_polling(PollEvent::READ|PollEvent::WRITE)) != Error::OK) {
124  delete io_handler;
125  HT_THROWF(error, "Problem polling on raw socket bound to %s",
126  addr.to_str().c_str());
127  }
128 
129  return Error::OK;
130 }
131 
132 
133 int
134 Comm::connect(const CommAddress &addr, const DispatchHandlerPtr &default_handler) {
135  int sd;
136  int error = m_handler_map->contains_data_handler(addr);
137  uint16_t port;
138 
139  if (error == Error::OK)
141  else if (error != Error::COMM_NOT_CONNECTED)
142  return error;
143 
144  while (true) {
145 
146  if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
147  if (m_verbose)
148  HT_ERRORF("socket: %s", strerror(errno));
150  }
151 
152  // Get arbitray ephemeral port that won't conflict with our reserved ports
153  port = (uint16_t)(49152 + std::uniform_int_distribution<>(0, 16382)(ReactorFactory::rng));
154  m_local_addr.sin_port = htons(port);
155 
156  // bind socket to local address
157  if ((::bind(sd, (const sockaddr *)&m_local_addr, sizeof(sockaddr_in))) < 0) {
158  if (errno == EADDRINUSE) {
159  ::close(sd);
160  continue;
161  }
162  if (m_verbose)
163  HT_ERRORF( "bind: %s: %s", m_local_addr.format().c_str(), strerror(errno));
164  return Error::COMM_BIND_ERROR;
165  }
166  break;
167  }
168 
169  return connect_socket(sd, addr, default_handler);
170 }
171 
172 
173 
174 int
175 Comm::connect(const CommAddress &addr, const CommAddress &local_addr,
176  const DispatchHandlerPtr &default_handler) {
177  int sd;
178  int error = m_handler_map->contains_data_handler(addr);
179 
180  HT_ASSERT(local_addr.is_inet());
181 
182  if (error == Error::OK)
184  else if (error != Error::COMM_NOT_CONNECTED)
185  return error;
186 
187  if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
188  if (m_verbose)
189  HT_ERRORF("socket: %s", strerror(errno));
191  }
192 
193  // bind socket to local address
194  if ((::bind(sd, (const sockaddr *)&local_addr.inet, sizeof(sockaddr_in))) < 0) {
195  if (m_verbose)
196  HT_ERRORF( "bind: %s: %s", local_addr.to_str().c_str(), strerror(errno));
197  return Error::COMM_BIND_ERROR;
198  }
199 
200  return connect_socket(sd, addr, default_handler);
201 }
202 
203 
204 int Comm::set_alias(const InetAddr &addr, const InetAddr &alias) {
205  return m_handler_map->set_alias(addr, alias);
206 }
207 
208 
210  DispatchHandlerPtr null_handler(0);
211  listen(addr, chf, null_handler);
212 }
213 
214 
215 int Comm::add_proxy(const String &proxy, const String &hostname, const InetAddr &addr) {
217  return m_handler_map->add_proxy(proxy, hostname, addr);
218 }
219 
220 int Comm::remove_proxy(const String &proxy) {
222  return m_handler_map->remove_proxy(proxy);
223 }
224 
225 bool Comm::translate_proxy(const String &proxy, InetAddr *addr) {
226  CommAddress proxy_addr;
227  proxy_addr.set_proxy(proxy);
228  return m_handler_map->translate_proxy_address(proxy_addr, addr);
229 }
230 
231 void Comm::get_proxy_map(ProxyMapT &proxy_map) {
232  m_handler_map->get_proxy_map(proxy_map);
233 }
234 
236  return m_handler_map->wait_for_proxy_map(timer);
237 }
238 
239 
240 void
242  const DispatchHandlerPtr &default_handler) {
243  IOHandlerAccept *handler;
244  int one = 1;
245  int sd;
246 
247  HT_ASSERT(addr.is_inet());
248 
249  if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
250  HT_THROW(Error::COMM_SOCKET_ERROR, strerror(errno));
251 
252  // Set to non-blocking
253  FileUtils::set_flags(sd, O_NONBLOCK);
254 
255 #if defined(__linux__)
256  if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0 && m_verbose)
257  HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
258 #elif defined(__sun__)
259  if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)) < 0 && m_verbose)
260  HT_ERRORF("setting TCP_NODELAY: %s", strerror(errno));
261 #elif defined(__APPLE__) || defined(__FreeBSD__)
262  if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0 && m_verbose)
263  HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
264  if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0 && m_verbose)
265  HT_WARNF("setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
266 #endif
267 
268  if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0 && m_verbose)
269  HT_ERRORF("setting SO_REUSEADDR: %s", strerror(errno));
270 
271  int bind_attempts = 0;
272  while ((::bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0) {
273  if (bind_attempts == 24)
274  HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
275  addr.to_str().c_str(), strerror(errno));
276  if (m_verbose)
277  HT_INFOF("Unable to bind to %s: %s, will retry in 10 seconds...",
278  addr.to_str().c_str(), strerror(errno));
279  this_thread::sleep_for(chrono::milliseconds(10000));
280  bind_attempts++;
281  }
282 
283  if (::listen(sd, 1000) < 0)
284  HT_THROWF(Error::COMM_LISTEN_ERROR, "listening: %s", strerror(errno));
285 
286  handler = new IOHandlerAccept(sd, default_handler, m_handler_map, chf);
287  m_handler_map->insert_handler(handler);
288 
289  int32_t error;
290  if ((error = handler->start_polling()) != Error::OK) {
291  delete handler;
292  HT_THROWF(error, "Problem polling on listen socket bound to %s",
293  addr.to_str().c_str());
294  }
295 }
296 
297 
298 
299 int
300 Comm::send_request(const CommAddress &addr, uint32_t timeout_ms,
301  CommBufPtr &cbuf, DispatchHandler *resp_handler) {
302  IOHandlerData *data_handler;
303  int error;
304 
305  if ((error = m_handler_map->checkout_handler(addr, &data_handler)) != Error::OK) {
306  if (m_verbose)
307  HT_WARNF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
308  return error;
309  }
310 
311  HT_ON_OBJ_SCOPE_EXIT(*m_handler_map.get(), &HandlerMap::decrement_reference_count, data_handler);
312 
313  return send_request(data_handler, timeout_ms, cbuf, resp_handler);
314 }
315 
316 
317 
318 int Comm::send_request(IOHandlerData *data_handler, uint32_t timeout_ms,
319  CommBufPtr &cbuf, DispatchHandler *resp_handler) {
320 
321  cbuf->header.flags |= CommHeader::FLAGS_BIT_REQUEST;
322  if (resp_handler == 0) {
323  cbuf->header.flags |= CommHeader::FLAGS_BIT_IGNORE_RESPONSE;
324  cbuf->header.id = 0;
325  }
326  else {
327  cbuf->header.id = ms_next_request_id++;
328  if (cbuf->header.id == 0)
329  cbuf->header.id = ms_next_request_id++;
330  }
331 
332  cbuf->header.timeout_ms = timeout_ms;
333  cbuf->write_header_and_reset();
334 
335  int error = data_handler->send_message(cbuf, timeout_ms, resp_handler);
336  if (error != Error::OK)
337  m_handler_map->decomission_handler(data_handler);
338  return error;
339 }
340 
341 
342 
343 int Comm::send_response(const CommAddress &addr, CommBufPtr &cbuf) {
344  IOHandlerData *data_handler;
345  int error;
346 
347  if ((error = m_handler_map->checkout_handler(addr, &data_handler)) != Error::OK) {
348  if (m_verbose)
349  HT_ERRORF("No connection for %s - %s", addr.to_str().c_str(), Error::get_text(error));
350  return error;
351  }
352 
353  HT_ON_OBJ_SCOPE_EXIT(*m_handler_map.get(), &HandlerMap::decrement_reference_count, data_handler);
354 
355  cbuf->header.flags &= CommHeader::FLAGS_MASK_REQUEST;
356 
357  cbuf->write_header_and_reset();
358 
359  error = data_handler->send_message(cbuf);
360  if (error != Error::OK)
361  m_handler_map->decomission_handler(data_handler);
362  return error;
363 }
364 
365 
366 void
368  const DispatchHandlerPtr &dhp) {
369  IOHandlerDatagram *handler;
370  int sd;
371 
372  HT_ASSERT(addr.is_inet());
373 
374  if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
375  HT_THROWF(Error::COMM_SOCKET_ERROR, "%s", strerror(errno));
376 
377  // Set to non-blocking
378  FileUtils::set_flags(sd, O_NONBLOCK);
379 
380  int bufsize = 4*32768;
381 
382  if (setsockopt(sd, SOL_SOCKET, SO_SNDBUF, (char *)&bufsize, sizeof(bufsize))
383  < 0) {
384  if (m_verbose)
385  HT_ERRORF("setsockopt(SO_SNDBUF) failed - %s", strerror(errno));
386  }
387  if (setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (char *)&bufsize, sizeof(bufsize))
388  < 0) {
389  if (m_verbose)
390  HT_ERRORF("setsockopt(SO_RCVBUF) failed - %s", strerror(errno));
391  }
392 
393  Reactor::Priority reactor_priority {Reactor::Priority::NORMAL};
394 
395  if (tos) {
396  int opt;
397 #if defined(__linux__)
398  opt = tos;
399  setsockopt(sd, SOL_IP, IP_TOS, &opt, sizeof(opt));
400  opt = tos;
401  setsockopt(sd, SOL_SOCKET, SO_PRIORITY, &opt, sizeof(opt));
402 #elif defined(__APPLE__) || defined(__sun__) || defined(__FreeBSD__)
403  opt = IPTOS_LOWDELAY; /* see <netinet/in.h> */
404  setsockopt(sd, IPPROTO_IP, IP_TOS, &opt, sizeof(opt));
405 #endif
406  reactor_priority = Reactor::Priority::HIGH;
407  }
408 
409  int bind_attempts = 0;
410  while ((::bind(sd, (const sockaddr *)&addr.inet, sizeof(sockaddr_in))) < 0) {
411  if (bind_attempts == 24)
412  HT_THROWF(Error::COMM_BIND_ERROR, "binding to %s: %s",
413  addr.to_str().c_str(), strerror(errno));
414  if (m_verbose)
415  HT_INFOF("Unable to bind to %s: %s, will retry in 10 seconds...",
416  addr.to_str().c_str(), strerror(errno));
417  this_thread::sleep_for(chrono::milliseconds(10000));
418  bind_attempts++;
419  }
420 
421  handler = new IOHandlerDatagram(sd, dhp, reactor_priority);
422 
423  addr.set_inet( handler->get_address() );
424 
425  m_handler_map->insert_handler(handler);
426 
427  int32_t error;
428  if ((error = handler->start_polling()) != Error::OK) {
429  delete handler;
430  HT_THROWF(error, "Problem polling on datagram socket bound to %s",
431  addr.to_str().c_str());
432  }
433 }
434 
435 
436 int
437 Comm::send_datagram(const CommAddress &addr, const CommAddress &send_addr,
438  CommBufPtr &cbuf) {
439  IOHandlerDatagram *handler;
440  int error;
441 
442  HT_ASSERT(addr.is_inet());
443 
444  if ((error = m_handler_map->checkout_handler(send_addr, &handler)) != Error::OK) {
445  if (m_verbose)
446  HT_ERRORF("Datagram send/local address %s not registered",
447  send_addr.to_str().c_str());
448  return error;
449  }
450 
451  HT_ON_OBJ_SCOPE_EXIT(*m_handler_map.get(), &HandlerMap::decrement_reference_count, handler);
452 
453  cbuf->header.flags |= (CommHeader::FLAGS_BIT_REQUEST |
455 
456  cbuf->write_header_and_reset();
457 
458  error = handler->send_message(addr.inet, cbuf);
459  if (error != Error::OK)
460  m_handler_map->decomission_handler(handler);
461  return error;
462 }
463 
464 
465 int Comm::set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler) {
466  ExpireTimer timer;
467  timer.expire_time = ClockT::now() + chrono::milliseconds(duration_millis);
468  timer.handler = handler;
469  m_timer_reactor->add_timer(timer);
470  return Error::OK;
471 }
472 
473 
474 int
476  ExpireTimer timer;
477  timer.expire_time = expire_time;
478  timer.handler = handler;
479  m_timer_reactor->add_timer(timer);
480  return Error::OK;
481 }
482 
484  m_timer_reactor->cancel_timer(handler);
485 }
486 
487 
488 void Comm::close_socket(const CommAddress &addr) {
489  IOHandler *handler = 0;
490  IOHandlerAccept *accept_handler;
491  IOHandlerData *data_handler;
492  IOHandlerDatagram *datagram_handler;
493  IOHandlerRaw *raw_handler;
494 
495  if (m_handler_map->checkout_handler(addr, &data_handler) == Error::OK)
496  handler = data_handler;
497  else if (m_handler_map->checkout_handler(addr, &datagram_handler) == Error::OK)
498  handler = datagram_handler;
499  else if (m_handler_map->checkout_handler(addr, &accept_handler) == Error::OK)
500  handler = accept_handler;
501  else if (m_handler_map->checkout_handler(addr, &raw_handler) == Error::OK)
502  handler = raw_handler;
503  else
504  return;
505 
506  HT_ON_OBJ_SCOPE_EXIT(*m_handler_map.get(), &HandlerMap::decrement_reference_count, handler);
507 
508  m_handler_map->decomission_handler(handler);
509 }
510 
512  int one = 1;
513  int sd;
514  InetAddr check_addr;
515  uint16_t starting_port = ntohs(addr.sin_port);
516 
517  for (size_t i=0; i<15; i++) {
518 
519  if ((sd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
520  HT_FATALF("socket(AF_INET, SOCK_STREAM, IPPROTO_TCP) failure: %s",
521  strerror(errno));
522 
523  if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
524  HT_FATALF("setting TCP socket SO_REUSEADDR: %s", strerror(errno));
525 
526 #if defined(__APPLE__) || defined(__FreeBSD__)
527  if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0 && m_verbose)
528  HT_WARNF("setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
529 #endif
530 
531  check_addr = addr;
532  check_addr.sin_port = htons(starting_port+i);
533 
534  if (::bind(sd, (const sockaddr *)&check_addr, sizeof(sockaddr_in)) == 0) {
535  ::close(sd);
536  addr.sin_port = check_addr.sin_port;
537  return;
538  }
539  ::close(sd);
540  }
541 
542  HT_FATALF("Unable to find available TCP port in range [%d..%d]",
543  (int)addr.sin_port, (int)addr.sin_port+14);
544 
545 }
546 
548  int one = 1;
549  int sd;
550  InetAddr check_addr;
551  uint16_t starting_port = ntohs(addr.sin_port);
552 
553  for (size_t i=0; i<15; i++) {
554 
555  if ((sd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
556  HT_FATALF("socket(AF_INET, SOCK_DGRAM, 0) failure: %s", strerror(errno));
557 
558  if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
559  HT_FATALF("setting UDP socket SO_REUSEADDR: %s", strerror(errno));
560 
561 #if defined(__APPLE__) || defined(__FreeBSD__)
562  if (setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0 && m_verbose)
563  HT_WARNF("setsockopt(SO_REUSEPORT) failure: %s", strerror(errno));
564 #endif
565 
566  check_addr = addr;
567  check_addr.sin_port = htons(starting_port+i);
568 
569  if (::bind(sd, (const sockaddr *)&addr, sizeof(sockaddr_in)) == 0) {
570  ::close(sd);
571  addr.sin_port = check_addr.sin_port;
572  return;
573  }
574  ::close(sd);
575  }
576 
577  HT_FATALF("Unable to find available UDP port in range [%d..%d]",
578  (int)addr.sin_port, (int)addr.sin_port+14);
579 
580 }
581 
582 
583 
588 int
589 Comm::connect_socket(int sd, const CommAddress &addr,
590  const DispatchHandlerPtr &default_handler) {
591  IOHandlerData *handler;
592  int32_t error;
593  int one = 1;
594  CommAddress connectable_addr;
595 
596  if (addr.is_proxy()) {
597  InetAddr inet_addr;
598  if (!m_handler_map->translate_proxy_address(addr, &inet_addr))
600  connectable_addr.set_inet(inet_addr);
601  }
602  else
603  connectable_addr = addr;
604 
605  // Set to non-blocking
606  FileUtils::set_flags(sd, O_NONBLOCK);
607 
608 #if defined(__linux__)
609  if (setsockopt(sd, SOL_TCP, TCP_NODELAY, &one, sizeof(one)) < 0 && m_verbose)
610  HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
611 #elif defined(__sun__)
612  if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) < 0 && m_verbose)
613  HT_ERRORF("setsockopt(TCP_NODELAY) failure: %s", strerror(errno));
614 #elif defined(__APPLE__) || defined(__FreeBSD__)
615  if (setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one)) < 0 && m_verbose)
616  HT_WARNF("setsockopt(SO_NOSIGPIPE) failure: %s", strerror(errno));
617 #endif
618 
619  handler = new IOHandlerData(sd, connectable_addr.inet, default_handler);
620  if (addr.is_proxy())
621  handler->set_proxy(addr.proxy);
622  m_handler_map->insert_handler(handler);
623 
624  while (::connect(sd, (struct sockaddr *)&connectable_addr.inet, sizeof(struct sockaddr_in))
625  < 0) {
626  if (errno == EINTR) {
627  this_thread::sleep_for(chrono::milliseconds(1000));
628  continue;
629  }
630  else if (errno == EINPROGRESS) {
631  //HT_INFO("connect() in progress starting to poll");
633  if (error == Error::COMM_POLL_ERROR) {
634  if (m_verbose)
635  HT_ERRORF("Polling problem on connection to %s: %s",
636  connectable_addr.to_str().c_str(), strerror(errno));
637  m_handler_map->remove_handler(handler);
638  delete handler;
639  }
640  return error;
641  }
642  m_handler_map->remove_handler(handler);
643  delete handler;
644  if (m_verbose)
645  HT_ERRORF("connecting to %s: %s", connectable_addr.to_str().c_str(),
646  strerror(errno));
648  }
649 
651  if (error != Error::OK) {
652  if (m_verbose)
653  HT_ERRORF("Polling problem on connection to %s: %s (%s)",
654  connectable_addr.to_str().c_str(),
655  Error::get_text(error), strerror(errno));
656  m_handler_map->remove_handler(handler);
657  delete handler;
658  }
659  return error;
660 }
int start_polling(int mode=PollEvent::READ)
Start polling on the handler with the poll interest specified in mode.
Definition: IOHandler.cc:91
static std::mutex mutex
Definition: Logger.cc:43
static bool verbose
Verbose mode.
int register_socket(int sd, const CommAddress &addr, RawSocketHandler *handler)
Registers an externally managed socket with comm event loop.
Definition: Comm.cc:109
void set_proxy(const String &proxy)
Sets the proxy name for this connection.
Definition: IOHandler.h:239
static std::atomic< uint32_t > ms_next_request_id
Atomic integer used for assinging request IDs.
Definition: Comm.h:504
#define HT_WARNF(msg,...)
Definition: Logger.h:290
State record for timer.
Definition: ExpireTimer.h:42
int send_message(const InetAddr &addr, CommBufPtr &cbp)
Sends a message.
String proxy
Proxy name.
Definition: CommAddress.h:175
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
I/O handler for datagram (UDP) sockets.
int connect(const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Establishes a TCP connection and attaches a default dispatch handler.
Definition: Comm.cc:134
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Declarations for ReactorRunner.
Abstract base class for application dispatch handlers registered with AsyncComm.
bool wait_for_proxy_load(Timer &timer)
Waits until a CommHeader::FLAGS_BIT_PROXY_MAP_UPDATE message is received from the proxy master...
Definition: Comm.cc:235
int set_alias(const InetAddr &addr, const InetAddr &alias)
Sets an alias for a TCP connection.
Definition: Comm.cc:204
bool translate_proxy(const String &proxy, InetAddr *addr)
Translates a proxy name to an IP address.
Definition: Comm.cc:225
static std::vector< ReactorPtr > ms_reactors
Vector of reactors (last position is timer reactor)
InetAddr get_address()
Gets the handler socket address.
Definition: IOHandler.h:229
static std::default_random_engine rng
Pseudo random number generator.
#define HT_ABORT
Definition: Logger.h:175
STL namespace.
~Comm()
Destructor.
Definition: Comm.cc:91
ClockT::time_point expire_time
Absolute expiration time.
Definition: ExpireTimer.h:43
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
Definition: Comm.cc:437
static void destroy()
Destroys singleton instance of the Comm class.
Definition: Comm.cc:102
Abstract base class for application raw socket handlers registered with AsyncComm.
Data available to read.
Definition: PollEvent.h:42
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void decrement_reference_count()
Decrement reference count.
Definition: IOHandler.h:355
void set_inet(sockaddr_in addr)
Sets address type to CommAddress::INET and inet value to addr.
Definition: CommAddress.h:82
DispatchHandlerPtr handler
Dispatch handler to receive TIMER event.
Definition: ExpireTimer.h:44
static HandlerMapPtr handler_map
Smart pointer to HandlerMap.
Definition: ReactorRunner.h:70
File system utility functions.
static void destroy()
This method shuts down the reactors.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
static time_point now() noexcept
Definition: fast_clock.cc:37
Declarations for IOHandlerAccept.
void find_available_tcp_port(InetAddr &addr)
Finds an unused TCP port starting from addr.
Definition: Comm.cc:511
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
void close_socket(const CommAddress &addr)
Closes the socket specified by the addr argument.
Definition: Comm.cc:488
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void set_proxy(const String &str)
Sets address type to CommAddress::PROXY and proxy name to p.
Definition: CommAddress.h:76
std::unordered_map< String, ProxyAddressInfo > ProxyMapT
Forward mapping hash type from proxy name to ProxyAddressInfo.
Definition: ProxyMap.h:57
Compatibility Macros for C/C++.
Base class for socket descriptor I/O handlers.
Definition: IOHandler.h:76
int connect_socket(int sd, const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Creates a TCP socket connection.
Definition: Comm.cc:589
I/O handler for TCP sockets.
Definition: IOHandlerData.h:51
String to_str() const
Returns string representation of address.
Definition: CommAddress.cc:34
Time related declarations.
Declarations for IOHandlerData.
static bool set_flags(int fd, int flags)
Sets fcntl flags of a socket.
Definition: FileUtils.cc:256
Comm()
Private constructor (prevent non-singleton usage).
Definition: Comm.cc:74
int send_response(const CommAddress &addr, CommBufPtr &cbuf)
Sends a response message back over a connection.
Definition: Comm.cc:343
static Comm * ms_instance
Pointer to singleton instance of this class.
Definition: Comm.h:501
bool is_inet() const
Returns true if address is of type CommAddress::INET.
Definition: CommAddress.h:153
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
void find_available_udp_port(InetAddr &addr)
Finds an unused UDP port starting from addr.
Definition: Comm.cc:547
Writing can be performed without blocking.
Definition: PollEvent.h:46
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
static std::mutex ms_mutex
Mutex for serializing access to ms_instance
Definition: Comm.h:507
#define HT_ERROR(msg)
Definition: Logger.h:299
void get_proxy_map(ProxyMapT &proxy_map)
Returns the proxy map.
Definition: Comm.cc:231
Declarations for Comm.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Internet address wrapper classes and utility functions.
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
Definition: Comm.cc:367
Priority
Enumeration for reactor priority.
Definition: Reactor.h:73
Declarations for ReactorFactory.
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
void decrement_reference_count(IOHandler *handler)
Decrements the reference count of handler.
Definition: HandlerMap.cc:132
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf)
Creates listen (accept) socket on addr.
Definition: Comm.cc:209
int set_timer_absolute(ClockT::time_point expire_time, const DispatchHandlerPtr &handler)
Sets a timer for absolute time expire_time.
Definition: Comm.cc:475
#define HT_ON_OBJ_SCOPE_EXIT(...)
Definition: ScopeGuard.h:305
InetAddr inet
IPv4:port address.
Definition: CommAddress.h:176
int set_timer(uint32_t duration_millis, const DispatchHandlerPtr &handler)
Sets a timer for duration_millis milliseconds in the future.
Definition: Comm.cc:465
void cancel_timer(const DispatchHandlerPtr &handler)
Cancels all scheduled timers registered with the dispatch handler handler.
Definition: Comm.cc:483
Configuration settings.
I/O handler for accept (listen) sockets.
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
Definition: Config.cc:607
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
Definition: CommAddress.h:147
System information and statistics based on libsigar.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Definition: Comm.cc:300
int add_proxy(const String &proxy, const String &hostname, const InetAddr &addr)
Adds a proxy name for a TCP connection.
Definition: Comm.cc:215
I/O handler for raw sockets.
Definition: IOHandlerRaw.h:42
static const NetInfo & net_info()
Retrieves updated Network information (see SystemInfo.h)
Definition: SystemInfo.cc:360
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
Executes user-defined functions when leaving the current scope.
static bool proxy_master
Set to true if this process is acting as "Proxy Master".
static void get_timer_reactor(ReactorPtr &reactor)
This method returns the timer reactor.
int remove_proxy(const String &proxy)
Removes a proxy name for a TCP connection.
Definition: Comm.cc:220