0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
sampleClient.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "DispatchHandler.h"
25 #include "Comm.h"
26 #include "CommHeader.h"
27 #include "Event.h"
28 
29 #include <Common/Init.h>
30 #include <Common/Error.h>
31 #include <Common/InetAddr.h>
32 #include <Common/Logger.h>
33 #include <Common/System.h>
34 #include <Common/Usage.h>
35 #include <Common/Serialization.h>
36 
37 #include <boost/thread/thread.hpp>
38 
39 #include <chrono>
40 #include <condition_variable>
41 #include <cstdio>
42 #include <fstream>
43 #include <iostream>
44 #include <mutex>
45 #include <queue>
46 #include <string>
47 #include <thread>
48 
49 extern "C" {
50 #include <netdb.h>
51 #include <errno.h>
52 #include <pthread.h>
53 #include <stdint.h>
54 #include <string.h>
55 }
56 
57 using namespace Hypertable;
58 using namespace Serialization;
59 using namespace std;
60 
61 namespace {
62  const char *DEFAULT_HOST = "localhost";
63  const int DEFAULT_PORT = 11255;
64  const int DEFAULT_TIMEOUT = 10000;
65  const char *usage[] = {
66  "usage: sampleClient [OPTIONS] <input-file>",
67  "",
68  "OPTIONS:",
69  " --host=<name> Specifies the host to connect to (default = localhost)",
70  " --port=<n> Specifies the port to connect to (default = 11255)",
71  " --recv-addr=<addr> Let the server connect to us by listening for",
72  " connection request on <addr> (host:port). The address",
73  " that the server is connecting from should be the same",
74  " as in --host and --port or the defaults.",
75  " (TCP only)",
76  " --timeout=<t> Connection timeout in milliseconds (default=10000)",
77  " --verbose Generate verbose output",
78  " --udp Operate in UDP mode instead of TCP",
79  "",
80  "This is a sample program to test the AsyncComm library. It establishes",
81  "a connection with the sampleServer and sends each line of the input file",
82  "to the server. Each reply from the server is echoed to stdout.",
83  (const char *)0
84  };
85  bool g_verbose = false;
86 }
87 
88 
89 
96 
97 public:
98 
100  virtual ~ResponseHandler() { }
101 
102  virtual void handle(EventPtr &event_ptr) = 0;
103 
104  virtual bool get_response(EventPtr &event_ptr) = 0;
105 
106 protected:
107  std::queue<EventPtr> m_queue;
109  std::condition_variable m_cond;
110 };
111 
112 
113 
114 
115 
123 
124 public:
125 
127 
128  virtual void handle(EventPtr &event_ptr) {
129  std::lock_guard<std::mutex> lock(m_mutex);
130  if (event_ptr->type == Event::CONNECTION_ESTABLISHED) {
131  if (g_verbose)
132  HT_INFOF("Connection Established - %s", event_ptr->to_str().c_str());
133  m_connected = true;
134  m_cond.notify_one();
135  }
136  else if (event_ptr->type == Event::DISCONNECT) {
137  if (event_ptr->error != 0) {
138  HT_INFOF("Disconnect : %s", Error::get_text(event_ptr->error));
139  }
140  else {
141  HT_INFO("Disconnect.");
142  }
143  m_connected = false;
144  m_cond.notify_one();
145  }
146  else if (event_ptr->type == Event::ERROR) {
147  HT_INFOF("Error : %s", Error::get_text(event_ptr->error));
148  //exit(EXIT_FAILURE);
149  }
150  else if (event_ptr->type == Event::MESSAGE) {
151  m_queue.push(event_ptr);
152  m_cond.notify_one();
153  }
154  }
155 
157  std::unique_lock<std::mutex> lock(m_mutex);
158  if (m_connected)
159  return true;
160  m_cond.wait(lock);
161  return m_connected;
162  }
163 
164  virtual bool get_response(EventPtr &event_ptr) {
165  std::unique_lock<std::mutex> lock(m_mutex);
166  while (m_queue.empty()) {
167  if (m_connected == false)
168  return false;
169  m_cond.wait(lock);
170  }
171  event_ptr = m_queue.front();
172  m_queue.pop();
173  return true;
174  }
175 
176 private:
177  bool m_connected {};
178 };
179 
180 
181 
187 public:
189  m_dispatch_handler_ptr = dhp;
190  }
191  virtual void get_instance(DispatchHandlerPtr &dhp) {
192  dhp = m_dispatch_handler_ptr;
193  }
194 private:
196 };
197 
198 
199 
200 
207 
208 public:
209 
211 
212  virtual void handle(EventPtr &event_ptr) {
213  std::lock_guard<std::mutex> lock(m_mutex);
214  if (event_ptr->type == Event::MESSAGE) {
215  m_queue.push(event_ptr);
216  m_cond.notify_one();
217  }
218  else {
219  HT_INFOF("%s", event_ptr->to_str().c_str());
220  //exit(EXIT_FAILURE);
221  }
222  }
223 
224  virtual bool get_response(EventPtr &event_ptr) {
225  std::unique_lock<std::mutex> lock(m_mutex);
226  m_cond.wait(lock, [this](){ return !m_queue.empty(); });
227  event_ptr = m_queue.front();
228  m_queue.pop();
229  return true;
230  }
231 };
232 
233 
234 
235 
236 
240 int main(int argc, char **argv) {
241  Comm *comm;
242  int rval;
243  const char *host = DEFAULT_HOST;
244  struct sockaddr_in addr;
245  uint16_t port = DEFAULT_PORT;
246  time_t timeout = DEFAULT_TIMEOUT;
247  const char *in_file = 0;
248  int error;
249  EventPtr event_ptr;
250  DispatchHandlerPtr dhp;
251  ResponseHandler *resp_handler;
252  bool udp_mode = false;
253  string line;
254  int outstanding = 0;
255  int max_outstanding = 50;
256  const char *str;
257  CommAddress udp_send_addr;
258  sockaddr_in inet_addr;
259 
260  Config::init(0, 0);
261 
262  memset(&inet_addr, 0, sizeof(inet_addr));
263 
264  if (argc == 1)
265  Usage::dump_and_exit(usage);
266 
268 
269  for (int i=1; i<argc; i++) {
270  if (!strncmp(argv[i], "--host=", 7))
271  host = &argv[i][7];
272  else if (!strncmp(argv[i], "--port=", 7)) {
273  rval = atoi(&argv[i][7]);
274  if (rval <= 1024 || rval > 65535) {
275  cerr << "Invalid port. Must be in the range of 1024-65535." << endl;
276  exit(EXIT_FAILURE);
277  }
278  port = rval;
279  }
280  else if (!strncmp(argv[i], "--timeout=", 10))
281  timeout = (time_t)atoi(&argv[i][10]);
282  else if (!strcmp(argv[i], "--udp"))
283  udp_mode = true;
284  else if (!strncmp(argv[i], "--recv-addr=", 12)) {
285  if (!InetAddr::initialize(&inet_addr, &argv[i][12]))
286  HT_ABORT;
287  }
288  else if (!strcmp(argv[i], "--verbose")) {
289  g_verbose = true;
290  }
291  else if (in_file == 0)
292  in_file = argv[i];
293  else
294  Usage::dump_and_exit(usage);
295  }
296 
297  if (in_file == 0)
298  Usage::dump_and_exit(usage);
299 
300  if (!InetAddr::initialize(&addr, host, port))
301  exit(EXIT_FAILURE);
302 
303  comm = Comm::instance();
304 
305  ifstream myfile(in_file);
306 
307  if (!myfile.is_open()) {
308  HT_ERRORF("Unable to open file '%s' : %s", in_file, strerror(errno));
309  return 0;
310  }
311 
312  if (udp_mode) {
313  assert(inet_addr.sin_port == 0);
314  dhp = make_shared<ResponseHandlerUDP>();
315  resp_handler = static_cast<ResponseHandler *>(dhp.get());
316  port++;
317  InetAddr::initialize(&inet_addr, INADDR_ANY, port);
318  udp_send_addr.set_inet(inet_addr);
319  comm->create_datagram_receive_socket(udp_send_addr, 0, dhp);
320  }
321  else {
322  dhp = make_shared<ResponseHandlerTCP>();
323  resp_handler = static_cast<ResponseHandler *>(dhp.get());
324 
325  if (inet_addr.sin_port == 0) {
326  if ((error = comm->connect(addr, dhp)) != Error::OK) {
327  HT_ERRORF("Comm::connect error - %s", Error::get_text(error));
328  exit(EXIT_FAILURE);
329  }
330  }
331  else {
332  ConnectionHandlerFactoryPtr handler_factory = make_shared<HandlerFactory>(dhp);
333  comm->listen(inet_addr, handler_factory, dhp);
334  }
335  if (!((ResponseHandlerTCP *)resp_handler)->wait_for_connection())
336  exit(EXIT_FAILURE);
337 
338  }
339 
340  CommHeader header;
341  const uint8_t *decode_ptr;
342  size_t decode_remain;
343 
344  while (!myfile.eof()) {
345  getline (myfile,line);
346  if (line.length() > 0) {
347  CommBufPtr cbp(new CommBuf(header, encoded_length_str16(line)));
348  cbp->append_str16(line);
349  int retries = 0;
350 
351  if (udp_mode) {
352  if ((error = comm->send_datagram(addr, udp_send_addr, cbp)) != Error::OK) {
353  HT_ERRORF("Problem sending datagram - %s", Error::get_text(error));
354  return 1;
355  }
356  }
357  else {
358  while ((error = comm->send_request(addr, timeout, cbp, resp_handler))
359  != Error::OK) {
360  if (error == Error::COMM_NOT_CONNECTED) {
361  if (retries == 5) {
362  HT_ERROR("Connection timeout.");
363  return 1;
364  }
365  this_thread::sleep_for(chrono::milliseconds(1000));
366  retries++;
367  }
368  else {
369  HT_ERRORF("CommEngine::send_message returned '%s'",
370  Error::get_text(error));
371  return 1;
372  }
373  }
374  }
375  outstanding++;
376 
377  if (outstanding > max_outstanding) {
378  if (!resp_handler->get_response(event_ptr))
379  break;
380  try {
381  decode_ptr = event_ptr->payload;
382  decode_remain = event_ptr->payload_len;
383  str = decode_str16(&decode_ptr, &decode_remain);
384  if (*str != 0)
385  cout << "ECHO: " << str << endl;
386  else
387  cout << "[NULL]" << endl;
388  }
389  catch (Exception &e) {
390  cout <<"Error: "<< e << endl;
391  }
392  outstanding--;
393  }
394  }
395  }
396 
397  while (outstanding > 0 && resp_handler->get_response(event_ptr)) {
398  try {
399  decode_ptr = event_ptr->payload;
400  decode_remain = event_ptr->payload_len;
401  str = decode_str16(&decode_ptr, &decode_remain);
402  if (str != 0)
403  cout << "ECHO: " << str << endl;
404  else
405  cout << "[NULL]" << endl;
406  }
407  catch (Exception &e) {
408  cout <<"Error: "<< e << endl;
409  }
410  //cout << "out = " << outstanding << endl;
411  outstanding--;
412  }
413 
414 
415  myfile.close();
416 
418 
419  return 0;
420 }
421 
422 
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
Retrieves system information (hardware, installation directory, etc)
static std::mutex mutex
Definition: Logger.cc:43
virtual void get_instance(DispatchHandlerPtr &dhp)
Creates a connection dispatch handler.
DispatchHandlerPtr m_dispatch_handler_ptr
std::queue< EventPtr > m_queue
static void initialize(uint16_t reactor_count)
Initializes I/O reactors.
int connect(const CommAddress &addr, const DispatchHandlerPtr &default_handler)
Establishes a TCP connection and attaches a default dispatch handler.
Definition: Comm.cc:134
HandlerFactory(DispatchHandlerPtr &dhp)
Helper class for printing usage banners on the command line.
Abstract base class for application dispatch handlers registered with AsyncComm.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Definition: Init.h:95
Abstract class for creating default application dispatch handlers.
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
virtual bool get_response(EventPtr &event_ptr)=0
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_ABORT
Definition: Logger.h:175
#define HT_INFO(msg)
Definition: Logger.h:271
int main(int argc, char **argv)
main function
STL namespace.
int send_datagram(const CommAddress &addr, const CommAddress &send_addr, CommBufPtr &cbuf)
Sends a datagram to a remote address.
Definition: Comm.cc:437
Error event
Definition: Event.h:64
Connection established event.
Definition: Event.h:61
Declarations for Event.
void set_inet(sockaddr_in addr)
Sets address type to CommAddress::INET and inet value to addr.
Definition: CommAddress.h:82
Declarations for DispatchHandler.
static void destroy()
This method shuts down the reactors.
std::shared_ptr< ConnectionHandlerFactory > ConnectionHandlerFactoryPtr
Smart pointer to ConnectionHandlerFactory.
size_t encoded_length_str16(const char *str)
Computes the encoded length of a string16 encoding.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
virtual bool get_response(EventPtr &event_ptr)
Logging routines and macros.
std::mutex m_mutex
Compatibility Macros for C/C++.
Initialization helper for applications.
Functions to serialize/deserialize primitives to/from a memory buffer.
const char * decode_str16(const uint8_t **bufp, size_t *remainp)
Decodes a c-style string from the given buffer.
Connection disconnected event.
Definition: Event.h:62
(somewhat) Abstract base class for response handlers; Defines the message queue and the mutex and con...
Definition: sampleClient.cc:95
Hypertable definitions
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
#define HT_ERROR(msg)
Definition: Logger.h:299
virtual void handle(EventPtr &event_ptr)
Callback method.
Declarations for Comm.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Internet address wrapper classes and utility functions.
virtual void handle(EventPtr &event_ptr)
Callback method.
Request/response message event.
Definition: Event.h:63
void create_datagram_receive_socket(CommAddress &addr, int tos, const DispatchHandlerPtr &handler)
Creates a socket for receiving datagrams and attaches handler as the default dispatch handler...
Definition: Comm.cc:367
virtual bool get_response(EventPtr &event_ptr)
This is a generic exception class for Hypertable.
Definition: Error.h:314
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
virtual ~ResponseHandler()
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void listen(const CommAddress &addr, ConnectionHandlerFactoryPtr &chf)
Creates listen (accept) socket on addr.
Definition: Comm.cc:209
This is the dispatch handler that gets installed as the default handler for the TCP connection...
Declarations for CommHeader.
static void dump_and_exit(const char **usage, int rcode=1)
Same as dump, but performs _exit(rcode) afterwards.
Definition: Usage.cc:41
This is the dispatch handler that gets installed as the default handler for UDP mode.
Error codes, Exception handling, error logging.
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Definition: Comm.cc:300
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
std::condition_variable m_cond