0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Client.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
29 #include "Client.h"
30 
56 #include "Response/Parameters/Status.h"
57 
58 #include <AsyncComm/Comm.h>
59 #include <AsyncComm/CommBuf.h>
60 #include <AsyncComm/CommHeader.h>
61 #include <AsyncComm/Protocol.h>
62 
63 #include <Common/Error.h>
64 #include <Common/Filesystem.h>
65 #include <Common/Logger.h>
66 #include <Common/Serialization.h>
67 
68 using namespace Hypertable;
69 using namespace Serialization;
70 using namespace Hypertable::FsBroker;
71 using namespace Hypertable::FsBroker::Lib;
72 using namespace std;
73 
74 Client::Client(ConnectionManagerPtr &conn_mgr, const sockaddr_in &addr,
75  uint32_t timeout_ms)
76  : m_conn_mgr(conn_mgr), m_addr(addr), m_timeout_ms(timeout_ms) {
77  m_comm = conn_mgr->get_comm();
78  conn_mgr->add(m_addr, m_timeout_ms, "FS Broker");
79 }
80 
81 
83  : m_conn_mgr(conn_mgr) {
84  m_comm = conn_mgr->get_comm();
85  uint16_t port = cfg->get_i16("FsBroker.Port");
86  String host = cfg->get_str("FsBroker.Host");
87  if (cfg->has("FsBroker.Timeout"))
88  m_timeout_ms = cfg->get_i32("FsBroker.Timeout");
89  else
90  m_timeout_ms = cfg->get_i32("Hypertable.Request.Timeout");
91 
92  // Backward compatibility
93  if (cfg->has("DfsBroker.Host"))
94  host = cfg->get_str("DfsBroker.Host");
95  if (cfg->has("DfsBroker.Port"))
96  port = cfg->get_i16("DfsBroker.Port");
97 
98  InetAddr::initialize(&m_addr, host.c_str(), port);
99 
100  conn_mgr->add(m_addr, m_timeout_ms, "FS Broker");
101 }
102 
103 Client::Client(Comm *comm, const sockaddr_in &addr, uint32_t timeout_ms)
104  : m_comm(comm), m_conn_mgr(0), m_addr(addr), m_timeout_ms(timeout_ms) {
105 }
106 
107 Client::Client(const String &host, int port, uint32_t timeout_ms)
108  : m_timeout_ms(timeout_ms) {
109  InetAddr::initialize(&m_addr, host.c_str(), port);
111  m_conn_mgr = make_shared<ConnectionManager>(m_comm);
112  m_conn_mgr->add(m_addr, timeout_ms, "FS Broker");
113  if (!m_conn_mgr->wait_for_connection(m_addr, timeout_ms))
115  "Timed out waiting for connection to FS Broker");
116 }
117 
118 
124 }
125 
126 
127 void
128 Client::open(const String &name, uint32_t flags, DispatchHandler *handler) {
130  Request::Parameters::Open params(name, flags, 0);
131  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
132  params.encode(cbuf->get_data_ptr_address());
133 
134  try {
135  send_message(cbuf, handler);
136  }
137  catch (Exception &e) {
138  HT_THROW2F(e.code(), e, "Error opening FS file: %s", name.c_str());
139  }
140 }
141 
142 
143 int
144 Client::open(const String &name, uint32_t flags) {
145  DispatchHandlerSynchronizer sync_handler;
146  EventPtr event;
148  Request::Parameters::Open params(name, flags, 0);
149  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
150  params.encode(cbuf->get_data_ptr_address());
151 
152  try {
153  send_message(cbuf, &sync_handler);
154 
155  if (!sync_handler.wait_for_reply(event))
156  HT_THROW(Protocol::response_code(event.get()),
157  Protocol::string_format_message(event).c_str());
158 
159  int32_t fd;
160  decode_response_open(event, &fd);
161  return fd;
162  }
163  catch (Exception &e) {
164  HT_THROW2F(e.code(), e, "Error opening FS file: %s", name.c_str());
165  }
166 }
167 
168 
169 int
170 Client::open_buffered(const String &name, uint32_t flags, uint32_t buf_size,
171  uint32_t outstanding, uint64_t start_offset,
172  uint64_t end_offset) {
173  try {
174  HT_ASSERT((flags & Filesystem::OPEN_FLAG_DIRECTIO) == 0 ||
175  (HT_IO_ALIGNED(buf_size) &&
176  HT_IO_ALIGNED(start_offset) &&
177  HT_IO_ALIGNED(end_offset)));
178  int fd = open(name, flags|OPEN_FLAG_VERIFY_CHECKSUM);
179  {
180  lock_guard<mutex> lock(m_mutex);
183  new ClientBufferedReaderHandler(this, fd, buf_size, outstanding,
184  start_offset, end_offset);
185  }
186  return fd;
187  }
188  catch (Exception &e) {
189  HT_THROW2F(e.code(), e, "Error opening buffered FS file=%s buf_size=%u "
190  "outstanding=%u start_offset=%llu end_offset=%llu", name.c_str(),
191  buf_size, outstanding, (Llu)start_offset, (Llu)end_offset);
192  }
193 }
194 
195 void Client::decode_response_open(EventPtr &event, int32_t *fd) {
196  int error = Protocol::response_code(event);
197  if (error != Error::OK)
199 
200  const uint8_t *ptr = event->payload + 4;
201  size_t remain = event->payload_len - 4;
202 
204  params.decode(&ptr, &remain);
205  *fd = params.get_fd();
206 }
207 
208 
209 void
210 Client::create(const String &name, uint32_t flags, int32_t bufsz,
211  int32_t replication, int64_t blksz,
212  DispatchHandler *handler) {
214  Request::Parameters::Create params(name, flags, bufsz, replication, blksz);
215  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
216  params.encode(cbuf->get_data_ptr_address());
217 
218  try {
219  send_message(cbuf, handler);
220  }
221  catch (Exception &e) {
222  HT_THROW2F(e.code(), e, "Error creating FS file: %s:", name.c_str());
223  }
224 }
225 
226 
227 int
228 Client::create(const String &name, uint32_t flags, int32_t bufsz,
229  int32_t replication, int64_t blksz) {
230  DispatchHandlerSynchronizer sync_handler;
231  EventPtr event;
233  Request::Parameters::Create params(name, flags, bufsz, replication, blksz);
234  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
235  params.encode(cbuf->get_data_ptr_address());
236 
237  try {
238  send_message(cbuf, &sync_handler);
239 
240  if (!sync_handler.wait_for_reply(event))
241  HT_THROW(Protocol::response_code(event.get()),
242  Protocol::string_format_message(event).c_str());
243 
244  int32_t fd;
245  decode_response_create(event, &fd);
246  return fd;
247  }
248  catch (Exception &e) {
249  HT_THROW2F(e.code(), e, "Error creating FS file: %s", name.c_str());
250  }
251 }
252 
253 
254 void Client::decode_response_create(EventPtr &event, int32_t *fd) {
255  decode_response_open(event, fd);
256 }
257 
258 
259 void
260 Client::close(int32_t fd, DispatchHandler *handler) {
261  ClientBufferedReaderHandler *reader_handler = 0;
263  header.gid = fd;
264  Request::Parameters::Close params(fd);
265  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
266  params.encode(cbuf->get_data_ptr_address());
267 
268  {
269  lock_guard<mutex> lock(m_mutex);
270  auto iter = m_buffered_reader_map.find(fd);
271  if (iter != m_buffered_reader_map.end()) {
272  reader_handler = (*iter).second;
273  m_buffered_reader_map.erase(iter);
274  }
275  }
276  delete reader_handler;
277 
278  try {
279  send_message(cbuf, handler);
280  }
281  catch (Exception &e) {
282  HT_THROW2F(e.code(), e, "Error closing FS fd: %d", (int)fd);
283  }
284 }
285 
286 
287 void
288 Client::close(int32_t fd) {
289  ClientBufferedReaderHandler *reader_handler = 0;
290  DispatchHandlerSynchronizer sync_handler;
291  EventPtr event;
293  header.gid = fd;
294  Request::Parameters::Close params(fd);
295  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
296  params.encode(cbuf->get_data_ptr_address());
297  {
298  lock_guard<mutex> lock(m_mutex);
299  auto iter = m_buffered_reader_map.find(fd);
300  if (iter != m_buffered_reader_map.end()) {
301  reader_handler = (*iter).second;
302  m_buffered_reader_map.erase(iter);
303  }
304  }
305  delete reader_handler;
306 
307  try {
308  send_message(cbuf, &sync_handler);
309 
310  if (!sync_handler.wait_for_reply(event))
311  HT_THROW(Protocol::response_code(event.get()),
312  Protocol::string_format_message(event).c_str());
313  }
314  catch(Exception &e) {
315  HT_THROW2F(e.code(), e, "Error closing FS fd: %d", (int)fd);
316  }
317 }
318 
319 
320 void
321 Client::read(int32_t fd, size_t len, DispatchHandler *handler) {
323  header.gid = fd;
324  Request::Parameters::Read params(fd, len);
325  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
326  params.encode(cbuf->get_data_ptr_address());
327 
328  try {
329  send_message(cbuf, handler);
330  }
331  catch (Exception &e) {
332  HT_THROW2F(e.code(), e, "Error sending read request for %u bytes "
333  "from FS fd: %d", (unsigned)len, (int)fd);
334  }
335 }
336 
337 
338 size_t
339 Client::read(int32_t fd, void *dst, size_t len) {
340  ClientBufferedReaderHandler *reader_handler = 0;
341  {
342  lock_guard<mutex> lock(m_mutex);
343  auto iter = m_buffered_reader_map.find(fd);
344  if (iter != m_buffered_reader_map.end())
345  reader_handler = (*iter).second;
346  }
347  try {
348  if (reader_handler)
349  return reader_handler->read(dst, len);
350 
351  DispatchHandlerSynchronizer sync_handler;
352  EventPtr event;
354  header.gid = fd;
355  Request::Parameters::Read params(fd, len);
356  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
357  params.encode(cbuf->get_data_ptr_address());
358  send_message(cbuf, &sync_handler);
359 
360  if (!sync_handler.wait_for_reply(event))
361  HT_THROW(Protocol::response_code(event.get()),
362  Protocol::string_format_message(event).c_str());
363 
364  uint32_t length;
365  uint64_t offset;
366  const void *data;
367  decode_response_read(event, &data, &offset, &length);
368  HT_ASSERT(length <= len);
369  memcpy(dst, data, length);
370  return length;
371  }
372  catch (Exception &e) {
373  HT_THROW2F(e.code(), e, "Error reading %u bytes from FS fd %d",
374  (unsigned)len, (int)fd);
375  }
376 }
377 
378 void Client::decode_response_read(EventPtr &event, const void **buffer,
379  uint64_t *offset, uint32_t *length) {
380  int error = Protocol::response_code(event);
381  if (error != Error::OK)
383 
384  const uint8_t *ptr = event->payload + 4;
385  size_t remain = event->payload_len - 4;
386 
388  params.decode(&ptr, &remain);
389  *offset = params.get_offset();
390  *length = params.get_amount();
391 
392  if (*length == (uint32_t)-1) {
393  *length = 0;
394  return;
395  }
396 
397  if (remain < (size_t)*length)
398  HT_THROWF(Error::RESPONSE_TRUNCATED, "%lu < %lu", (Lu)remain, (Lu)*length);
399 
400  *buffer = ptr;
401 }
402 
403 void
404 Client::append(int32_t fd, StaticBuffer &buffer, Flags flags,
405  DispatchHandler *handler) {
407  header.gid = fd;
409  CommBuf *cbuf = new CommBuf(header, HT_DIRECT_IO_ALIGNMENT, buffer);
410  Request::Parameters::Append params(fd, buffer.size, static_cast<uint8_t>(flags));
411  uint8_t *base = (uint8_t *)cbuf->get_data_ptr();
412  params.encode(cbuf->get_data_ptr_address());
413  size_t padding = HT_DIRECT_IO_ALIGNMENT -
414  (((uint8_t *)cbuf->get_data_ptr()) - base);
415  memset(cbuf->get_data_ptr(), 0, padding);
416  cbuf->advance_data_ptr(padding);
417 
418  CommBufPtr cbp(cbuf);
419 
420  try {
421  send_message(cbp, handler);
422  }
423  catch (Exception &e) {
424  HT_THROW2F(e.code(), e, "Error appending %u bytes to FS fd %d",
425  (unsigned)buffer.size, (int)fd);
426  }
427 }
428 
429 
430 size_t Client::append(int32_t fd, StaticBuffer &buffer, Flags flags) {
431  DispatchHandlerSynchronizer sync_handler;
432  EventPtr event;
433 
435  header.gid = fd;
437  CommBuf *cbuf = new CommBuf(header, HT_DIRECT_IO_ALIGNMENT, buffer);
438  Request::Parameters::Append params(fd, buffer.size, static_cast<uint8_t>(flags));
439  uint8_t *base = (uint8_t *)cbuf->get_data_ptr();
440  params.encode(cbuf->get_data_ptr_address());
441  size_t padding = HT_DIRECT_IO_ALIGNMENT -
442  (((uint8_t *)cbuf->get_data_ptr()) - base);
443  memset(cbuf->get_data_ptr(), 0, padding);
444  cbuf->advance_data_ptr(padding);
445 
446  CommBufPtr cbp(cbuf);
447 
448  try {
449  send_message(cbp, &sync_handler);
450 
451  if (!sync_handler.wait_for_reply(event))
452  HT_THROW(Protocol::response_code(event.get()),
453  Protocol::string_format_message(event).c_str());
454 
455  uint64_t offset;
456  uint32_t amount;
457  decode_response_append(event, &offset, &amount);
458 
459  if (buffer.size != amount)
460  HT_THROWF(Error::FSBROKER_IO_ERROR, "tried to append %u bytes but got "
461  "%u", (unsigned)buffer.size, (unsigned)amount);
462  return (size_t)amount;
463  }
464  catch (Exception &e) {
465  HT_THROW2F(e.code(), e, "Error appending %u bytes to FS fd %d",
466  (unsigned)buffer.size, (int)fd);
467  }
468 }
469 
470 
471 void Client::decode_response_append(EventPtr &event, uint64_t *offset,
472  uint32_t *length) {
473  int error = Protocol::response_code(event);
474  if (error != Error::OK)
476 
477  const uint8_t *ptr = event->payload + 4;
478  size_t remain = event->payload_len - 4;
479 
481  params.decode(&ptr, &remain);
482  *offset = params.get_offset();
483  *length = params.get_amount();
484 }
485 
486 
487 void
488 Client::seek(int32_t fd, uint64_t offset, DispatchHandler *handler) {
490  header.gid = fd;
491  Request::Parameters::Seek params(fd, offset);
492  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
493  params.encode(cbuf->get_data_ptr_address());
494 
495  try { send_message(cbuf, handler); }
496  catch (Exception &e) {
497  HT_THROW2F(e.code(), e, "Error seeking to %llu on FS fd %d",
498  (Llu)offset, (int)fd);
499  }
500 }
501 
502 
503 void
504 Client::seek(int32_t fd, uint64_t offset) {
505  DispatchHandlerSynchronizer sync_handler;
506  EventPtr event;
508  header.gid = fd;
509  Request::Parameters::Seek params(fd, offset);
510  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
511  params.encode(cbuf->get_data_ptr_address());
512 
513  try {
514  send_message(cbuf, &sync_handler);
515 
516  if (!sync_handler.wait_for_reply(event))
517  HT_THROW(Protocol::response_code(event.get()),
518  Protocol::string_format_message(event).c_str());
519  }
520  catch (Exception &e) {
521  HT_THROW2F(e.code(), e, "Error seeking to %llu on FS fd %d",
522  (Llu)offset, (int)fd);
523  }
524 }
525 
526 
527 void
528 Client::remove(const String &name, DispatchHandler *handler) {
530  Request::Parameters::Remove params(name);
531  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
532  params.encode(cbuf->get_data_ptr_address());
533 
534  try { send_message(cbuf, handler); }
535  catch (Exception &e) {
536  HT_THROW2F(e.code(), e, "Error removing FS file: %s", name.c_str());
537  }
538 }
539 
540 
541 void
542 Client::remove(const String &name, bool force) {
543  DispatchHandlerSynchronizer sync_handler;
544  EventPtr event;
546  Request::Parameters::Remove params(name);
547  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
548  params.encode(cbuf->get_data_ptr_address());
549 
550  try {
551  send_message(cbuf, &sync_handler);
552 
553  if (!sync_handler.wait_for_reply(event)) {
554  int error = Protocol::response_code(event.get());
555  if (!force || error != Error::FSBROKER_FILE_NOT_FOUND)
556  HT_THROW(error, Protocol::string_format_message(event).c_str());
557  }
558  }
559  catch (Exception &e) {
560  HT_THROW2F(e.code(), e, "Error removing FS file: %s", name.c_str());
561  }
562 }
563 
564 
565 void
566 Client::shutdown(uint16_t flags, DispatchHandler *handler) {
568  Request::Parameters::Shutdown params(flags);
569  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
570  params.encode(cbuf->get_data_ptr_address());
571 
572  try { send_message(cbuf, handler); }
573  catch (Exception &e) {
574  HT_THROW2F(e.code(), e, "sending FS shutdown (flags=%d)", (int)flags);
575  }
576 }
577 
578 
580  DispatchHandlerSynchronizer sync_handler;
581  EventPtr event;
583  CommBufPtr cbuf( new CommBuf(header) );
584 
585  try {
586  send_message(cbuf, &sync_handler, timer);
587 
588  if (!sync_handler.wait_for_reply(event))
589  HT_THROW(Protocol::response_code(event.get()),
590  Protocol::string_format_message(event).c_str());
591 
592  decode_response_status(event, status);
593  }
594  catch (Exception &e) {
596  format("%s - %s", Error::get_text(e.code()), e.what()));
597  }
598 }
599 
601  int error = Protocol::response_code(event);
602  if (error != Error::OK)
604 
605  const uint8_t *ptr = event->payload + 4;
606  size_t remain = event->payload_len - 4;
607 
609  params.decode(&ptr, &remain);
610  status = params.status();
611 }
612 
613 
614 
615 void Client::length(const String &name, bool accurate,
616  DispatchHandler *handler) {
618  Request::Parameters::Length params(name, accurate);
619  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
620  params.encode(cbuf->get_data_ptr_address());
621 
622  try { send_message(cbuf, handler); }
623  catch (Exception &e) {
624  HT_THROW2F(e.code(), e, "Error sending length request for FS file: %s",
625  name.c_str());
626  }
627 }
628 
629 
630 int64_t Client::length(const String &name, bool accurate) {
631  DispatchHandlerSynchronizer sync_handler;
632  EventPtr event;
634  Request::Parameters::Length params(name, accurate);
635  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
636  params.encode(cbuf->get_data_ptr_address());
637 
638  try {
639  send_message(cbuf, &sync_handler);
640 
641  if (!sync_handler.wait_for_reply(event))
642  HT_THROW(Protocol::response_code(event.get()),
643  Protocol::string_format_message(event).c_str());
644 
645  return decode_response_length(event);
646  }
647  catch (Exception &e) {
648  HT_THROW2F(e.code(), e, "Error getting length of FS file: %s",
649  name.c_str());
650  }
651 }
652 
654  int error = Protocol::response_code(event);
655  if (error != Error::OK)
657 
658  const uint8_t *ptr = event->payload + 4;
659  size_t remain = event->payload_len - 4;
660 
662  params.decode(&ptr, &remain);
663  return params.get_length();
664 }
665 
666 
667 void
668 Client::pread(int32_t fd, size_t len, uint64_t offset,
669  bool verify_checksum, DispatchHandler *handler) {
671  header.gid = fd;
672  Request::Parameters::Pread params(fd, offset, len, verify_checksum);
673  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
674  params.encode(cbuf->get_data_ptr_address());
675 
676  try { send_message(cbuf, handler); }
677  catch (Exception &e) {
678  HT_THROW2F(e.code(), e, "Error sending pread request at byte %llu "
679  "on FS fd %d", (Llu)offset, (int)fd);
680  }
681 }
682 
683 
684 size_t
685 Client::pread(int32_t fd, void *dst, size_t len, uint64_t offset, bool verify_checksum) {
686  DispatchHandlerSynchronizer sync_handler;
687  EventPtr event;
689  header.gid = fd;
690  Request::Parameters::Pread params(fd, offset, len, verify_checksum);
691  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
692  params.encode(cbuf->get_data_ptr_address());
693 
694  try {
695  send_message(cbuf, &sync_handler);
696 
697  if (!sync_handler.wait_for_reply(event))
698  HT_THROW(Protocol::response_code(event.get()),
699  Protocol::string_format_message(event).c_str());
700 
701  uint32_t length;
702  uint64_t off;
703  const void *data;
704  decode_response_pread(event, &data, &off, &length);
705  HT_ASSERT(length <= len);
706  memcpy(dst, data, length);
707  return length;
708  }
709  catch (Exception &e) {
710  HT_THROW2F(e.code(), e, "Error preading at byte %llu on FS fd %d",
711  (Llu)offset, (int)fd);
712  }
713 }
714 
715 void Client::decode_response_pread(EventPtr &event, const void **buffer,
716  uint64_t *offset, uint32_t *length) {
717  decode_response_read(event, buffer, offset, length);
718 }
719 
720 void Client::mkdirs(const String &name, DispatchHandler *handler) {
722  Request::Parameters::Mkdirs params(name);
723  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
724  params.encode(cbuf->get_data_ptr_address());
725 
726  try { send_message(cbuf, handler); }
727  catch (Exception &e) {
728  HT_THROW2F(e.code(), e, "Error sending mkdirs request for FS "
729  "directory: %s", name.c_str());
730  }
731 }
732 
733 
734 void
735 Client::mkdirs(const String &name) {
736  DispatchHandlerSynchronizer sync_handler;
737  EventPtr event;
739  Request::Parameters::Mkdirs params(name);
740  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
741  params.encode(cbuf->get_data_ptr_address());
742 
743  try {
744  send_message(cbuf, &sync_handler);
745 
746  if (!sync_handler.wait_for_reply(event))
747  HT_THROW(Protocol::response_code(event.get()),
748  Protocol::string_format_message(event).c_str());
749  }
750  catch (Exception &e) {
751  HT_THROW2F(e.code(), e, "Error mkdirs FS directory %s", name.c_str());
752  }
753 }
754 
755 
756 void
757 Client::flush(int32_t fd, DispatchHandler *handler) {
759  header.gid = fd;
760  Request::Parameters::Flush params(fd);
761  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
762  params.encode(cbuf->get_data_ptr_address());
763 
764  try { send_message(cbuf, handler); }
765  catch (Exception &e) {
766  HT_THROW2F(e.code(), e, "Error flushing FS fd %d", (int)fd);
767  }
768 }
769 
770 
771 void
772 Client::flush(int32_t fd) {
773  DispatchHandlerSynchronizer sync_handler;
774  EventPtr event;
776  header.gid = fd;
777  Request::Parameters::Flush params(fd);
778  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
779  params.encode(cbuf->get_data_ptr_address());
780 
781  try {
782  send_message(cbuf, &sync_handler);
783 
784  if (!sync_handler.wait_for_reply(event))
785  HT_THROW(Protocol::response_code(event.get()),
786  Protocol::string_format_message(event).c_str());
787  }
788  catch (Exception &e) {
789  HT_THROW2F(e.code(), e, "Error flushing FS fd %d", (int)fd);
790  }
791 }
792 
793 
794 void
795 Client::sync(int32_t fd) {
796  DispatchHandlerSynchronizer sync_handler;
797  EventPtr event;
799  header.gid = fd;
800  Request::Parameters::Sync params(fd);
801  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
802  params.encode(cbuf->get_data_ptr_address());
803 
804  try {
805  send_message(cbuf, &sync_handler);
806 
807  if (!sync_handler.wait_for_reply(event))
808  HT_THROW(Protocol::response_code(event.get()),
809  Protocol::string_format_message(event).c_str());
810  }
811  catch (Exception &e) {
812  HT_THROW2F(e.code(), e, "Error syncing FS fd %d", (int)fd);
813  }
814 }
815 
816 
817 void Client::rmdir(const String &name, DispatchHandler *handler) {
819  Request::Parameters::Rmdir params(name);
820  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
821  params.encode(cbuf->get_data_ptr_address());
822 
823  try { send_message(cbuf, handler); }
824  catch (Exception &e) {
825  HT_THROW2F(e.code(), e, "Error sending rmdir request for FS directory: "
826  "%s", name.c_str());
827  }
828 }
829 
830 
831 void
832 Client::rmdir(const String &name, bool force) {
833  DispatchHandlerSynchronizer sync_handler;
834  EventPtr event;
836  Request::Parameters::Rmdir params(name);
837  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
838  params.encode(cbuf->get_data_ptr_address());
839 
840  try {
841  send_message(cbuf, &sync_handler);
842 
843  if (!sync_handler.wait_for_reply(event)) {
844  int error = Protocol::response_code(event.get());
845 
846  if (!force || error != Error::FSBROKER_FILE_NOT_FOUND)
847  HT_THROW(error, Protocol::string_format_message(event).c_str());
848  }
849  }
850  catch (Exception &e) {
851  HT_THROW2F(e.code(), e, "Error removing FS directory: %s", name.c_str());
852  }
853 }
854 
855 
856 void Client::readdir(const String &name, DispatchHandler *handler) {
858  Request::Parameters::Readdir params(name);
859  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
860  params.encode(cbuf->get_data_ptr_address());
861 
862  try { send_message(cbuf, handler); }
863  catch (Exception &e) {
864  HT_THROW2F(e.code(), e, "Error sending readdir request for FS directory"
865  ": %s", name.c_str());
866  }
867 }
868 
869 
870 void Client::readdir(const String &name, std::vector<Dirent> &listing) {
871  DispatchHandlerSynchronizer sync_handler;
872  EventPtr event;
874  Request::Parameters::Readdir params(name);
875  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
876  params.encode(cbuf->get_data_ptr_address());
877 
878  try {
879  send_message(cbuf, &sync_handler);
880 
881  if (!sync_handler.wait_for_reply(event))
882  HT_THROW(Protocol::response_code(event.get()),
883  Protocol::string_format_message(event).c_str());
884 
885  decode_response_readdir(event, listing);
886  }
887  catch (Exception &e) {
888  HT_THROW2F(e.code(), e, "Error reading directory entries for FS "
889  "directory: %s", name.c_str());
890  }
891 }
892 
893 
895  std::vector<Dirent> &listing) {
896  int error = Protocol::response_code(event);
897  if (error != Error::OK)
899 
900  const uint8_t *ptr = event->payload + 4;
901  size_t remain = event->payload_len - 4;
902 
904  params.decode(&ptr, &remain);
905  params.get_listing(listing);
906 }
907 
908 
909 void Client::exists(const String &name, DispatchHandler *handler) {
911  Request::Parameters::Exists params(name);
912  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
913  params.encode(cbuf->get_data_ptr_address());
914 
915  try { send_message(cbuf, handler); }
916  catch (Exception &e) {
917  HT_THROW2F(e.code(), e, "sending 'exists' request for FS path: %s",
918  name.c_str());
919  }
920 }
921 
922 
923 bool Client::exists(const String &name) {
924  DispatchHandlerSynchronizer sync_handler;
925  EventPtr event;
927  Request::Parameters::Exists params(name);
928  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
929  params.encode(cbuf->get_data_ptr_address());
930 
931  try {
932  send_message(cbuf, &sync_handler);
933 
934  if (!sync_handler.wait_for_reply(event))
935  HT_THROW(Protocol::response_code(event.get()),
936  Protocol::string_format_message(event).c_str());
937 
938  return decode_response_exists(event);
939  }
940  catch (Exception &e) {
941  HT_THROW2F(e.code(), e, "Error checking existence of FS path: %s",
942  name.c_str());
943  }
944 }
945 
946 
948  int error = Protocol::response_code(event);
949  if (error != Error::OK)
951 
952  const uint8_t *ptr = event->payload + 4;
953  size_t remain = event->payload_len - 4;
954 
956  params.decode(&ptr, &remain);
957  return params.get_exists();
958 }
959 
960 
961 
962 void
963 Client::rename(const String &src, const String &dst, DispatchHandler *handler) {
965  Request::Parameters::Rename params(src, dst);
966  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
967  params.encode(cbuf->get_data_ptr_address());
968 
969  try { send_message(cbuf, handler); }
970  catch (Exception &e) {
971  HT_THROW2F(e.code(), e, "Error sending 'rename' request for FS "
972  "path: %s -> %s", src.c_str(), dst.c_str());
973  }
974 }
975 
976 
977 void
978 Client::rename(const String &src, const String &dst) {
979  DispatchHandlerSynchronizer sync_handler;
980  EventPtr event;
982  Request::Parameters::Rename params(src, dst);
983  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
984  params.encode(cbuf->get_data_ptr_address());
985 
986  try {
987  send_message(cbuf, &sync_handler);
988 
989  if (!sync_handler.wait_for_reply(event))
990  HT_THROW(Protocol::response_code(event.get()),
991  Protocol::string_format_message(event).c_str());
992  }
993  catch (Exception &e) {
994  HT_THROW2F(e.code(), e, "Error renaming of FS path: %s -> %s",
995  src.c_str(), dst.c_str());
996  }
997 }
998 
999 
1000 void
1001 Client::debug(int32_t command, StaticBuffer &serialized_parameters,
1002  DispatchHandler *handler) {
1004  Request::Parameters::Debug params(command);
1005  CommBufPtr cbuf(new CommBuf(header, params.encoded_length(),
1006  serialized_parameters));
1007  params.encode(cbuf->get_data_ptr_address());
1008 
1009  try { send_message(cbuf, handler); }
1010  catch (Exception &e) {
1011  HT_THROW2F(e.code(), e, "Error sending debug command %d request", command);
1012  }
1013 }
1014 
1015 
1016 void
1017 Client::debug(int32_t command, StaticBuffer &serialized_parameters) {
1018  DispatchHandlerSynchronizer sync_handler;
1019  EventPtr event;
1021  Request::Parameters::Debug params(command);
1022  CommBufPtr cbuf(new CommBuf(header, params.encoded_length(),
1023  serialized_parameters));
1024  params.encode(cbuf->get_data_ptr_address());
1025 
1026  try {
1027  send_message(cbuf, &sync_handler);
1028 
1029  if (!sync_handler.wait_for_reply(event))
1030  HT_THROW(Protocol::response_code(event.get()),
1031  Protocol::string_format_message(event).c_str());
1032  }
1033  catch (Exception &e) {
1034  HT_THROW2F(e.code(), e, "Error sending debug command %d request", command);
1035  }
1036 }
1037 
1038 
1039 
1040 void
1042  uint32_t deadline = timer ? timer->remaining() : m_timeout_ms;
1043  int error = m_comm->send_request(m_addr, deadline, cbuf, handler);
1044 
1045  if (error != Error::OK)
1046  HT_THROWF(error, "FS send_request to %s failed", m_addr.format().c_str());
1047 }
void decode_response_open(EventPtr &event, int32_t *fd) override
Decodes the response from an open request.
Definition: Client.cc:195
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
A memory buffer of static size.
Definition: StaticBuffer.h:45
Request parameters for append requests.
Definition: Append.h:46
Response parameters for append requests.
Definition: Append.h:47
Response parameters for read requests.
Definition: Read.h:46
uint64_t get_offset()
Gets read data offset.
Definition: Read.h:62
Request parameters for debug requests.
Definition: Debug.h:46
void sync(int32_t fd) override
Definition: Client.cc:795
Response parameters for open requests.
Definition: Status.h:49
int64_t decode_response_length(EventPtr &event) override
Decodes the response from a length request.
Definition: Client.cc:653
Holds Nagios-style program status information.
Definition: Status.h:42
int open_buffered(const String &name, uint32_t flags, uint32_t buf_size, uint32_t outstanding, uint64_t start_offset=0, uint64_t end_offset=0) override
Opens a file in buffered (readahead) mode.
Definition: Client.cc:170
void remove(const String &name, DispatchHandler *handler) override
Removes a file asynchronously.
Definition: Client.cc:528
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
Abstract base class for a filesystem.
Declarations for Append response parameters.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Request parameters for rmdir requests.
Definition: Rmdir.h:46
#define HT_IO_ALIGNED(size)
Definition: Filesystem.h:51
std::unordered_map< uint32_t, ClientBufferedReaderHandler * > m_buffered_reader_map
Definition: Client.h:229
Request parameters for read requests.
Definition: Read.h:46
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
Declarations for Rename request parameters.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void mkdirs(const String &name, DispatchHandler *handler) override
Creates a directory asynchronously.
Definition: Client.cc:720
Abstract base class for application dispatch handlers registered with AsyncComm.
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
bool decode_response_exists(EventPtr &event) override
Decodes the response from an exists request.
Definition: Client.cc:947
Flags
Enumeration type for append flags.
Definition: Filesystem.h:76
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
uint64_t get_offset()
Gets append offset.
Definition: Append.h:64
Request parameters for shutdown requests.
Definition: Shutdown.h:46
File system broker definitions.
Definition: CephBroker.h:38
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
void flush(int32_t fd, DispatchHandler *handler) override
Definition: Client.cc:757
uint8_t ** get_data_ptr_address()
Returns address of the primary buffer internal data pointer.
Definition: CommBuf.h:160
Request parameters for open requests.
Definition: Open.h:46
void append(int32_t fd, StaticBuffer &buffer, Flags flags, DispatchHandler *handler) override
Definition: Client.cc:404
Request parameters for rename requests.
Definition: Rename.h:46
Declarations for Read response parameters.
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
uint16_t alignment
Align payload to this byte offset.
Definition: CommHeader.h:138
void decode_response_pread(EventPtr &event, const void **buffer, uint64_t *offset, uint32_t *length) override
Decodes the response from a pread request.
Definition: Client.cc:715
ConnectionManagerPtr m_conn_mgr
Definition: Client.h:226
Client(const std::string &install_dir, const std::string &config_file, uint32_t default_timeout_ms=0)
Constructs the object using the specified config file.
Definition: Client.cc:60
int32_t get_fd()
Gets file descriptor.
Definition: Open.h:60
void status(Status &status, Timer *timer=0) override
Check status of filesystem.
Definition: Client.cc:579
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
void set(Code code, const std::string &text)
Sets status code and text.
Definition: Status.h:101
void send_message(CommBufPtr &cbuf, DispatchHandler *handler, Timer *timer=0)
Sends a message to the FS broker.
Definition: Client.cc:1041
Declarations for Remove request parameters.
Response parameters for exists requests.
Definition: Exists.h:46
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void length(const String &name, bool accurate, DispatchHandler *handler) override
Gets the length of a file asynchronously.
Definition: Client.cc:615
void decode_response_status(EventPtr &event, Status &status) override
Decodes the response from an status request.
Definition: Client.cc:600
uint32_t gid
Group ID (see ApplicationQueue)
Definition: CommHeader.h:142
Request parameters for remove requests.
Definition: Remove.h:46
void create(const String &name, uint32_t flags, int32_t bufsz, int32_t replication, int64_t blksz, DispatchHandler *handler) override
Creates a file asynchronously.
Definition: Client.cc:210
void readdir(const String &name, DispatchHandler *handler) override
Obtains a listing of all files in a directory asynchronously.
Definition: Client.cc:856
Request parameters for flush requests.
Definition: Flush.h:46
Request parameters for sync requests.
Definition: Sync.h:46
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
void close(int32_t fd, DispatchHandler *handler) override
Definition: Client.cc:260
Declarations for Exists response parameters.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Declarations for Rmdir request parameters.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
Declarations for Length response parameters.
void decode_response_create(EventPtr &event, int32_t *fd) override
Decodes the response from a create request.
Definition: Client.cc:254
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void pread(int32_t fd, size_t len, uint64_t offset, bool verify_checksum, DispatchHandler *handler) override
Definition: Client.cc:668
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Logging routines and macros.
Response parameters for readdir requests.
Definition: Readdir.h:48
Compatibility Macros for C/C++.
Declarations for Create request parameters.
Declarations for Pread request parameters.
Declarations for Open response parameters.
uint32_t get_amount()
Gets amount of data read.
Definition: Read.h:66
void * advance_data_ptr(size_t len)
Advance the primary buffer internal data pointer by len bytes.
Definition: CommBuf.h:167
void decode_response_readdir(EventPtr &event, std::vector< Dirent > &listing) override
Decodes the response from a readdir request.
Definition: Client.cc:894
Functions to serialize/deserialize primitives to/from a memory buffer.
Declarations for Flush request parameters.
Response parameters for length requests.
Definition: Length.h:46
void debug(int32_t command, StaticBuffer &serialized_parameters) override
Invokes debug request asynchronously.
Definition: Client.cc:1017
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
void read(int32_t fd, size_t amount, DispatchHandler *handler) override
Definition: Client.cc:321
Declarations for Length request parameters.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
void rename(const String &src, const String &dst, DispatchHandler *handler) override
Rename a path asynchronously.
Definition: Client.cc:963
Hypertable definitions
void shutdown(uint16_t flags, DispatchHandler *handler)
Shuts down the FS broker.
Definition: Client.cc:566
Declarations for Exists request parameters.
DispatchHandler class used to synchronize with response messages.
Request parameters for close requests.
Definition: Close.h:46
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
Client(ConnectionManagerPtr &conn_manager_ptr, const sockaddr_in &addr, uint32_t timeout_ms)
Constructor with explicit values.
Definition: Client.cc:74
Declarations for CommBuf.
uint32_t get_amount()
Gets amount of data appended.
Definition: Append.h:68
Declarations for Comm.
void open(const String &name, uint32_t flags, DispatchHandler *handler) override
Opens a file asynchronously.
Definition: Client.cc:128
Declarations for Protocol.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Request parameters for exists requests.
Definition: Exists.h:46
Declarations for Close request parameters.
void rmdir(const String &name, DispatchHandler *handler) override
Recursively removes a directory asynchronously.
Definition: Client.cc:817
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
Response parameters for open requests.
Definition: Open.h:46
This is a generic exception class for Hypertable.
Definition: Error.h:314
Declarations for Readdir request parameters.
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
Declarations for Sync request parameters.
void seek(int32_t fd, uint64_t offset, DispatchHandler *handler) override
Definition: Client.cc:488
long unsigned int Lu
Shortcut for printf formats.
Definition: String.h:47
Request parameters for seek requests.
Definition: Seek.h:46
File system broker framework and client library.
Definition: Broker.h:44
const Hypertable::Status & status() const
Gets status information.
Definition: Status.h:64
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
Declarations for CommHeader.
Declarations for Mkdirs request parameters.
Request parameters for mkdirs requests.
Definition: Mkdirs.h:46
void * get_data_ptr()
Returns the primary buffer internal data pointer.
Definition: CommBuf.h:156
Request parameters for pread requests.
Definition: Pread.h:46
Request parameters for length requests.
Definition: Length.h:46
Declarations for Debug request parameters.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Definition: Comm.cc:300
Declarations for FsBroker request handler Factory.
Declarations for Open request parameters.
Request parameters for readdir requests.
Definition: Readdir.h:46
void decode_response_append(EventPtr &event, uint64_t *offset, uint32_t *length) override
Decodes the response from an append request.
Definition: Client.cc:471
Request parameters for create requests.
Definition: Create.h:46
Declarations for Client.
Declarations for Seek request parameters.
void decode_response_read(EventPtr &event, const void **buffer, uint64_t *offset, uint32_t *length) override
Decodes the response from a read request.
Definition: Client.cc:378
void exists(const String &name, DispatchHandler *handler) override
Determines if a file exists asynchronously.
Definition: Client.cc:909
Declarations for Readdir response parameters.
void get_listing(std::vector< Filesystem::Dirent > &listing)
Gets directory listing.
Definition: Readdir.h:63
Declarations for Shutdown request parameters.
#define HT_DIRECT_IO_ALIGNMENT
Definition: Filesystem.h:49
Declarations for Append request parameters.
int code() const
Returns the error code.
Definition: Error.h:391
Declarations for Read request parameters.