0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
testServer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "DispatchHandler.h"
25 #include "Comm.h"
26 #include "Event.h"
27 
32 
33 #include <Common/Init.h>
34 #include <Common/Error.h>
35 #include <Common/InetAddr.h>
36 #include <Common/System.h>
37 #include <Common/SockAddrMap.h>
38 #include <Common/Usage.h>
39 
40 #include <chrono>
41 #include <iostream>
42 #include <queue>
43 #include <string>
44 #include <thread>
45 
46 extern "C" {
47 #include <arpa/inet.h>
48 #include <pthread.h>
49 #include <stdint.h>
50 #include <string.h>
51 #include <sys/socket.h>
52 #include <sys/types.h>
53 }
54 
55 using namespace Hypertable;
56 using namespace std;
57 
58 namespace {
59 
60  int g_delay = 0;
61  bool g_verbose = false;
62  const int DEFAULT_PORT = 11255;
63  const char *usage[] = {
64  "usage: sampleServer [OPTIONS]",
65  "",
66  "OPTIONS:",
67  " --connect-to=<addr> Connect to a client listening on <addr> (TCP only)."
68  " --help Display this help text and exit",
69  " --port=<n> Specifies the port to listen on (default=11255)",
70  " --app-queue Use an application queue for handling requests",
71  " --reactors=<n> Specifies the number of reactors (default=1)",
72  " --delay=<ms> Milliseconds to wait before echoing message (default=0)",
73  " --udp Operate in UDP mode instead of TCP",
74  " --verbose,-v Generate verbose output",
75  ""
76  "This is a sample program to test the AsyncComm library. It establishes",
77  "a connection with the sampleServer and sends each line of the input file",
78  "to the server. Each reply from the server is echoed to stdout.",
79  (const char *)0
80  };
81 
87  class RequestHandler : public ApplicationHandler {
88  public:
89 
90  RequestHandler(Comm *comm, EventPtr &event_ptr)
91  : ApplicationHandler(event_ptr), m_comm(comm) { return; }
92 
93  virtual void run() {
94  CommHeader header;
95  header.initialize_from_request_header(m_event->header);
96  CommBufPtr cbp(new CommBuf(header, m_event->payload_len));
97  cbp->append_bytes((uint8_t *)m_event->payload,
98  m_event->payload_len);
99  int error = m_comm->send_response(m_event->addr, cbp);
100  if (error != Error::OK) {
101  HT_ERRORF("Comm::send_response returned %s", Error::get_text(error));
102  }
103  }
104  private:
105  Comm *m_comm;
106  };
107 
108 
113  class Dispatcher : public DispatchHandler {
114 
115  public:
116 
117  Dispatcher(Comm *comm, ApplicationQueue *app_queue)
118  : m_comm(comm), m_app_queue(app_queue) { return; }
119 
120  virtual void handle(EventPtr &event_ptr) {
121  if (g_verbose && event_ptr->type == Event::CONNECTION_ESTABLISHED) {
122  HT_INFO("Connection Established.");
123  }
124  else if (g_verbose && event_ptr->type == Event::DISCONNECT) {
125  if (event_ptr->error != 0) {
126  HT_INFOF("Disconnect : %s", Error::get_text(event_ptr->error));
127  }
128  else {
129  HT_INFO("Disconnect.");
130  }
131  }
132  else if (event_ptr->type == Event::ERROR) {
133  HT_WARNF("Error : %s", Error::get_text(event_ptr->error));
134  }
135  else if (event_ptr->type == Event::MESSAGE) {
136  if (m_app_queue == 0) {
137  CommHeader header;
138  header.initialize_from_request_header(event_ptr->header);
139  CommBufPtr cbp(new CommBuf(header, event_ptr->payload_len));
140  cbp->append_bytes((uint8_t *)event_ptr->payload,
141  event_ptr->payload_len);
142  if (g_delay > 0)
143  this_thread::sleep_for(chrono::milliseconds(g_delay));
144  int error = m_comm->send_response(event_ptr->addr, cbp);
145  if (error != Error::OK) {
146  HT_ERRORF("Comm::send_response returned %s",
147  Error::get_text(error));
148  }
149  }
150  else
151  m_app_queue->add(new RequestHandler(m_comm, event_ptr));
152  }
153  }
154 
155  private:
156  Comm *m_comm;
157  ApplicationQueue *m_app_queue;
158  };
159 
160 
161 
166  class UdpDispatcher : public DispatchHandler {
167  public:
168 
169  UdpDispatcher(Comm *comm) : m_comm(comm) { return; }
170 
171  virtual void handle(EventPtr &event_ptr) {
172  if (event_ptr->type == Event::MESSAGE) {
173  CommHeader header;
174  header.initialize_from_request_header(event_ptr->header);
175  CommBufPtr cbp(new CommBuf(header, event_ptr->payload_len));
176  cbp->append_bytes((uint8_t *)event_ptr->payload,
177  event_ptr->payload_len);
178  if (g_delay > 0)
179  this_thread::sleep_for(chrono::milliseconds(g_delay));
180  int error = m_comm->send_datagram(event_ptr->addr,
181  event_ptr->local_addr, cbp);
182  if (error != Error::OK) {
183  HT_ERRORF("Comm::send_response returned %s", Error::get_text(error));
184  }
185  }
186  else {
187  HT_ERRORF("Error : %s", event_ptr->to_str().c_str());
188  }
189  }
190 
191  private:
192  Comm *m_comm;
193  };
194 
195 
201  public:
203  : m_dispatch_handler_ptr(dhp) { return; }
204 
205  virtual void get_instance(DispatchHandlerPtr &dhp) {
206  dhp = m_dispatch_handler_ptr;
207  }
208 
209  private:
210  DispatchHandlerPtr m_dispatch_handler_ptr;
211  };
212 
213 }
214 
215 
219 int main(int argc, char **argv) {
220  Comm *comm;
221  int rval, error;
222  uint16_t port = DEFAULT_PORT;
223  int reactor_count = 2;
224  ApplicationQueue *app_queue = 0;
225  bool udp = false;
226  DispatchHandlerPtr dhp;
227  CommAddress local_addr;
228  InetAddr client_addr;
229 
230  Config::init(0, 0);
231 
232  memset(&client_addr, 0, sizeof(client_addr));
233 
234  for (int i=1; i<argc; i++) {
235  if (!strcmp(argv[i], "--help"))
236  Usage::dump_and_exit(usage);
237  else if (!strcmp(argv[i], "--app-queue")) {
238  app_queue = new ApplicationQueue(5);
239  }
240  else if (!strncmp(argv[i], "--connect-to=", 13)) {
241  if (!InetAddr::initialize(&client_addr, &argv[i][13]))
242  HT_ABORT;
243  }
244  else if (!strncmp(argv[i], "--port=", 7)) {
245  rval = atoi(&argv[i][7]);
246  if (rval <= 1024 || rval > 65535) {
247  cerr << "Invalid port. Must be in the range of 1024-65535." << endl;
248  exit(EXIT_FAILURE);
249  }
250  port = (uint16_t)rval;
251  }
252  else if (!strncmp(argv[i], "--reactors=", 11))
253  reactor_count = atoi(&argv[i][11]);
254  else if (!strncmp(argv[i], "--delay=", 8))
255  g_delay = atoi(&argv[i][8]);
256  else if (!strcmp(argv[i], "--udp"))
257  udp = true;
258  else if (!strcmp(argv[i], "--verbose") || !strcmp(argv[i], "-v"))
259  g_verbose = true;
260  else
261  Usage::dump_and_exit(usage);
262  }
263 
264  try {
265 
266  ReactorFactory::initialize(reactor_count);
267 
268  comm = Comm::instance();
269 
270  if (g_verbose) {
271  cout << "Listening on port " << port << endl;
272  if (g_delay)
273  cout << "Delay = " << g_delay << endl;
274  }
275 
276  {
277  InetAddr addr;
278  InetAddr::initialize(&addr, "localhost", port);
279  local_addr = addr;
280  }
281 
282  if (!udp) {
283  dhp = std::make_shared<Dispatcher>(comm, app_queue);
284 
285  if (client_addr.sin_port != 0) {
286  if ((error = comm->connect(client_addr, local_addr, dhp)) != Error::OK) {
287  HT_ERRORF("Comm::connect error - %s", Error::get_text(error));
288  exit(EXIT_FAILURE);
289  }
290  }
291  else {
292  ConnectionHandlerFactoryPtr handler_factory = make_shared<HandlerFactory>(dhp);
293  comm->listen(local_addr, handler_factory, dhp);
294  }
295  }
296  else {
297  assert(client_addr.sin_port == 0);
298  dhp = std::make_shared<UdpDispatcher>(comm);
299  comm->create_datagram_receive_socket(local_addr, 0, dhp);
300  }
301 
303 
304  }
305  catch (Hypertable::Exception &e) {
306  HT_ERROR_OUT << e << HT_END;
307  _exit(EXIT_FAILURE);
308  }
309 
310  _exit(EXIT_SUCCESS);
311 }
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
Declarations for ConnectionHandlerFactory.
Retrieves system information (hardware, installation directory, etc)
void initialize_from_request_header(CommHeader &req_header)
Initializes header from req_header.
Definition: CommHeader.h:128
#define HT_WARNF(msg,...)
Definition: Logger.h:290
Declarations of ApplicationHandler.
static void initialize(uint16_t reactor_count)
Initializes I/O reactors.
Helper class for printing usage banners on the command line.
Abstract base class for application dispatch handlers registered with AsyncComm.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Definition: Init.h:95
Abstract class for creating default application dispatch handlers.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_ABORT
Definition: Logger.h:175
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
Error event
Definition: Event.h:64
Connection established event.
Definition: Event.h:61
Declarations for Event.
Declarations for DispatchHandler.
static void join()
Joins with reactor threads.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
Compatibility Macros for C/C++.
Initialization helper for applications.
#define HT_END
Definition: Logger.h:220
int main(int argc, char **argv)
main function
Definition: testServer.cc:219
Connection disconnected event.
Definition: Event.h:62
#define HT_ERROR_OUT
Definition: Logger.h:301
Declarations for SockAddrMap.
Hypertable definitions
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
Declarations for Comm.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Internet address wrapper classes and utility functions.
Request/response message event.
Definition: Event.h:63
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
Definition: Comm.cc:367
Declarations for ReactorFactory.
This is a generic exception class for Hypertable.
Definition: Error.h:314
Base clase for application handlers.
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Declarations for ApplicationQueue.
static void dump_and_exit(const char **usage, int rcode=1)
Same as dump, but performs _exit(rcode) afterwards.
Definition: Usage.cc:41
Error codes, Exception handling, error logging.
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52