0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "Session.h"
25 
27 #include <Hyperspace/Master.h>
28 #include <Hyperspace/Protocol.h>
29 
31 #include <AsyncComm/Comm.h>
33 
34 #include <Common/Error.h>
35 #include <Common/InetAddr.h>
36 #include <Common/Logger.h>
37 #include <Common/Properties.h>
38 #include <Common/Serialization.h>
40 #include <Common/Time.h>
41 
42 #include <boost/algorithm/string.hpp>
43 #include <boost/tokenizer.hpp>
44 
45 #include <cassert>
46 #include <chrono>
47 
48 using namespace std;
49 using namespace Hypertable;
50 using namespace Hyperspace;
51 using namespace Serialization;
52 
53 
54 Session::Session(Comm *comm, PropertiesPtr &cfg)
55  : m_comm(comm), m_cfg(cfg), m_verbose(false), m_silent(false),
56  m_state(STATE_JEOPARDY), m_last_callback_id(0) {
57 
58  HT_TRY("getting config values",
59  m_verbose = cfg->get_bool("Hypertable.Verbose");
60  m_silent = cfg->get_bool("Hypertable.Silent");
61  m_grace_period = cfg->get_i32("Hyperspace.GracePeriod");
62  m_lease_interval = cfg->get_i32("Hyperspace.Lease.Interval");
63  m_hyperspace_port = cfg->get_i16("Hyperspace.Replica.Port");
64  m_reconnect = cfg->get_bool("Hyperspace.Session.Reconnect"));
65 
66  if (m_reconnect)
67  HT_INFO("Hyperspace session setup to reconnect");
68 
69  for (const auto &replica : cfg->get_strs("Hyperspace.Replica.Host")) {
70  m_hyperspace_replicas.push_back(replica);
71  }
72 
74 
75  m_expire_time = chrono::steady_clock::now() +
76  chrono::milliseconds(m_grace_period);
77 
78  m_keepalive_handler_ptr = std::make_shared<ClientKeepaliveHandler>(m_comm, m_cfg, this);
79  m_keepalive_handler_ptr->start();
80 
81  function<void()> sleep_callback = [this]() -> void {this->handle_sleep();};
82  function<void()> wakeup_callback = [this]() -> void {this->handle_wakeup();};
83  m_sleep_wake_notifier = new SleepWakeNotifier(sleep_callback, wakeup_callback);
84 }
85 
88  m_keepalive_handler_ptr->destroy_session();
89  delete m_sleep_wake_notifier;
90 }
91 
93 {
94  lock_guard<mutex> lock(m_mutex);
97  m_hyperspace_master = host;
98 }
99 
101  lock_guard<mutex> lock(m_mutex);
102  m_expire_time = chrono::steady_clock::now() +
103  chrono::milliseconds(m_grace_period);
104 }
105 
107  {
108  lock_guard<mutex> lock(m_mutex);
109  m_expire_time = chrono::steady_clock::now() +
110  chrono::milliseconds(m_grace_period);
111  }
114 }
115 
116 
117 void Session::shutdown(Timer *timer) {
118  DispatchHandlerSynchronizer sync_handler;
119  Hypertable::EventPtr event_ptr;
120  CommBufPtr cbuf_ptr(Protocol::create_shutdown_request());
121 
122  try_again:
123  if (!wait_for_safe())
125 
126  int error = send_message(cbuf_ptr, &sync_handler, timer);
127  if (error == Error::OK) {
128  if (!sync_handler.wait_for_reply(event_ptr))
129  HT_THROW((int)Protocol::response_code(event_ptr.get()),
130  "Hyperspace 'shutdown' error");
131  }
132  else {
134  goto try_again;
135  }
136 
137  m_keepalive_handler_ptr->wait_for_destroy_session();
139 }
140 
142 {
143  lock_guard<mutex> lock(m_callback_mutex);
147 }
148 
150 {
151  lock_guard<mutex> lock(m_callback_mutex);
152  return m_callbacks.erase(cb->get_id());
153 }
154 
155 uint64_t
157  Timer *timer) {
158  DispatchHandlerSynchronizer sync_handler;
159  Hypertable::EventPtr event_ptr;
160 
161  handle_state->handle = 0;
162  handle_state->sequencer = 0;
163  handle_state->lock_status = 0;
164  uint32_t open_flags = handle_state->open_flags;
165 
166  if ((open_flags & OPEN_FLAG_LOCK_SHARED) == OPEN_FLAG_LOCK_SHARED)
167  handle_state->lock_mode = LOCK_MODE_SHARED;
168  else if ((open_flags & OPEN_FLAG_LOCK_EXCLUSIVE) == OPEN_FLAG_LOCK_EXCLUSIVE)
169  handle_state->lock_mode = LOCK_MODE_EXCLUSIVE;
170  else
171  handle_state->lock_mode = 0;
172 
173  try_again:
174  if (!wait_for_safe())
176 
177  int error = send_message(cbuf_ptr, &sync_handler, timer);
178  if (error == Error::OK) {
179  if (!sync_handler.wait_for_reply(event_ptr)) {
180  error = (int)Protocol::response_code(event_ptr.get());
181  HT_THROWF(error, "Hyperspace 'open' error, name=%s flags=0x%x events=0x"
182  "%x", handle_state->normal_name.c_str(), open_flags,
183  handle_state->event_mask);
184  }
185  else {
186  const uint8_t *decode_ptr = event_ptr->payload + 4;
187  size_t decode_remain = event_ptr->payload_len - 4;
188  handle_state->handle = decode_i64(&decode_ptr, &decode_remain);
189  decode_byte(&decode_ptr, &decode_remain);
190  handle_state->lock_generation = decode_i64(&decode_ptr, &decode_remain);
192  m_keepalive_handler_ptr->register_handle(handle_state);
193  HT_DEBUG_OUT << "Open succeeded session="
194  << m_keepalive_handler_ptr->get_session_id()
195  << ", name=" << handle_state->normal_name
196  << ", handle=" << handle_state->handle << ", flags=" << open_flags
197  << ", event_mask=" << handle_state->event_mask << HT_END;
198 
199  return handle_state->handle;
200  }
201  }
202 
204  goto try_again;
205 
206 }
207 
208 
209 uint64_t
210 Session::open(const std::string &name, uint32_t flags,
211  HandleCallbackPtr &callback, Timer *timer) {
212  ClientHandleStatePtr handle_state(new ClientHandleState());
213  std::vector<Attribute> empty_attrs;
214 
215  handle_state->open_flags = flags;
216  handle_state->event_mask = (callback) ? callback->get_event_mask() : 0;
217  handle_state->callback = callback;
218 
219  normalize_name(name, handle_state->normal_name);
220 
221  CommBufPtr cbuf_ptr(Protocol::create_open_request(handle_state->normal_name,
222  flags, callback, empty_attrs));
223 
224  return open(handle_state, cbuf_ptr, timer);
225 }
226 
227 uint64_t
228 Session::open(const std::string &name, uint32_t flags, Timer *timer) {
229  HandleCallbackPtr null_handle_callback;
230  return open(name, flags, null_handle_callback, timer);
231 }
232 
233 
234 uint64_t
235 Session::create(const std::string &name, uint32_t flags,
236  HandleCallbackPtr &callback,
237  const std::vector<Attribute> &init_attrs, Timer *timer) {
238  ClientHandleStatePtr handle_state(new ClientHandleState());
239 
240  handle_state->open_flags = flags | OPEN_FLAG_CREATE | OPEN_FLAG_EXCL;
241  handle_state->event_mask = (callback) ? callback->get_event_mask() : 0;
242  handle_state->callback = callback;
243  normalize_name(name, handle_state->normal_name);
244 
245  CommBufPtr cbuf_ptr(Protocol::create_open_request(handle_state->normal_name,
246  handle_state->open_flags, callback, init_attrs));
247 
248  return open(handle_state, cbuf_ptr, timer);
249 }
250 
251 
252 /*
253  *
254  */
255 void Session::close(uint64_t handle, Timer *timer) {
256  DispatchHandlerSynchronizer sync_handler;
257  Hypertable::EventPtr event_ptr;
258  CommBufPtr cbuf_ptr(Protocol::create_close_request(handle));
259 
260  try_again:
261  if (!wait_for_safe())
263 
264  int error = send_message(cbuf_ptr, &sync_handler, timer);
265  if (error == Error::OK) {
266  if (!sync_handler.wait_for_reply(event_ptr))
267  HT_THROW((int)Protocol::response_code(event_ptr.get()),
268  "Hyperspace 'close' error");
269  m_keepalive_handler_ptr->unregister_handle(handle);
270  }
271  else {
273  goto try_again;
274  }
275 }
276 
277 void Session::close_nowait(uint64_t handle) {
278  {
279  lock_guard<mutex> lock(m_mutex);
280  if (m_state != STATE_SAFE)
281  return;
282  }
283  CommBufPtr cbuf_ptr(Protocol::create_close_request(handle));
284  send_message(cbuf_ptr, 0, 0);
285 }
286 
287 
288 /*
289  *
290  */
291 void Session::mkdir(const std::string &name, const std::vector<Attribute> &init_attrs, Timer *timer) {
292  mkdir(name, false, &init_attrs, timer);
293 }
294 
295 void Session::mkdir(const std::string &name, Timer *timer) {
296  mkdir(name, false, 0, timer);
297 }
298 
299 void Session::mkdirs(const std::string &name, Timer *timer) {
300  mkdir(name, true, 0, timer);
301 }
302 
303 void Session::mkdirs(const std::string &name, const std::vector<Attribute> &init_attrs, Timer *timer) {
304  mkdir(name, true, &init_attrs, timer);
305 }
306 
307 
308 void Session::unlink(const std::string &name, Timer *timer) {
309  DispatchHandlerSynchronizer sync_handler;
310  Hypertable::EventPtr event_ptr;
311  String normal_name;
312 
313  normalize_name(name, normal_name);
314 
315  CommBufPtr cbuf_ptr(Protocol::create_delete_request(normal_name));
316 
317  try_again:
318  if (!wait_for_safe())
320 
321  int error = send_message(cbuf_ptr, &sync_handler, timer);
322  if (error == Error::OK) {
323  if (!sync_handler.wait_for_reply(event_ptr))
324  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
325  "Hyperspace 'unlink' error, name=%s", normal_name.c_str());
326  }
327  else {
329  goto try_again;
330  }
331 
332 }
333 
334 
335 bool Session::exists(const std::string &name, Timer *timer) {
336  DispatchHandlerSynchronizer sync_handler;
337  Hypertable::EventPtr event_ptr;
338  String normal_name;
339 
340  normalize_name(name, normal_name);
341 
342  CommBufPtr cbuf_ptr(Protocol::create_exists_request(normal_name));
343 
344  try_again:
345  if (!wait_for_safe())
347 
348  int error = send_message(cbuf_ptr, &sync_handler, timer);
349  if (error == Error::OK) {
350  if (!sync_handler.wait_for_reply(event_ptr)) {
351  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
352  "Hyperspace 'exists' error, name=%s", normal_name.c_str());
353  }
354  else {
355  const uint8_t *decode_ptr = event_ptr->payload + 4;
356  size_t decode_remain = event_ptr->payload_len - 4;
357  uint8_t bval = decode_byte(&decode_ptr, &decode_remain);
358  return (bval == 0) ? false : true;
359  }
360  }
361 
363  goto try_again;
364 }
365 
366 
367 /*
368  */
369 void Session::attr_set(uint64_t handle, const std::string &attr,
370  const void *value, size_t value_len, Timer *timer) {
371  DispatchHandlerSynchronizer sync_handler;
372  Hypertable::EventPtr event_ptr;
373  CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(handle, 0, 0, attr, value,
374  value_len));
375 
376  try_again:
377  if (!wait_for_safe())
379 
380  int error = send_message(cbuf_ptr, &sync_handler, timer);
381  if (error == Error::OK) {
382  if (!sync_handler.wait_for_reply(event_ptr)) {
383  ClientHandleStatePtr handle_state;
384  String fname = "UNKNOWN";
385  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
386  fname = handle_state->normal_name.c_str();
387  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
388  "Problem setting attribute '%s' of hyperspace file '%s'",
389  attr.c_str(), fname.c_str());
390  }
391  return;
392  }
393 
395  goto try_again;
396 }
397 
398 /*
399  */
400 void Session::attr_set(uint64_t handle, const std::vector<Attribute> &attrs,
401  Timer *timer) {
402  DispatchHandlerSynchronizer sync_handler;
403  Hypertable::EventPtr event_ptr;
404  CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(handle, 0, 0, attrs));
405 
406  try_again:
407  if (!wait_for_safe())
409 
410  int error = send_message(cbuf_ptr, &sync_handler, timer);
411  if (error == Error::OK) {
412  if (!sync_handler.wait_for_reply(event_ptr)) {
413  ClientHandleStatePtr handle_state;
414  String fname = "UNKNOWN";
415  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
416  fname = handle_state->normal_name.c_str();
417  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
418  "Problem setting attributes of hyperspace file '%s'", fname.c_str());
419  }
420  return;
421  }
422 
424  goto try_again;
425 }
426 
427 /*
428  */
429 void Session::attr_set(const std::string &name, const std::string &attr,
430  const void *value, size_t value_len, Timer *timer) {
431  attr_set(name, 0, attr, value, value_len, timer);
432 }
433 
434 void Session::attr_set(const std::string &name, uint32_t oflags, const std::string &attr,
435  const void *value, size_t value_len, Timer *timer) {
436  DispatchHandlerSynchronizer sync_handler;
437  Hypertable::EventPtr event_ptr;
438  CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(0, &name, oflags, attr, value,
439  value_len));
440 
441  try_again:
442  if (!wait_for_safe())
444 
445  int error = send_message(cbuf_ptr, &sync_handler, timer);
446  if (error == Error::OK) {
447  if (!sync_handler.wait_for_reply(event_ptr)) {
448  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
449  "Problem setting attribute '%s' of hyperspace file '%s'",
450  attr.c_str(), name.c_str());
451  }
452  return;
453  }
454 
456  goto try_again;
457 }
458 
459 void Session::attr_set(const std::string &name, uint32_t oflags,
460  const std::vector<Attribute> &attrs, Timer *timer) {
461 
462  DispatchHandlerSynchronizer sync_handler;
463  Hypertable::EventPtr event_ptr;
464  CommBufPtr cbuf_ptr(Protocol::create_attr_set_request(0, &name, oflags, attrs));
465 
466  try_again:
467  if (!wait_for_safe())
469 
470  int error = send_message(cbuf_ptr, &sync_handler, timer);
471  if (error == Error::OK) {
472  if (!sync_handler.wait_for_reply(event_ptr)) {
473  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
474  "Problem setting attributes of hyperspace file '%s'", name.c_str());
475  }
476  return;
477  }
478 
480  goto try_again;
481 }
482 
483 /*
484  */
485 uint64_t Session::attr_incr(uint64_t handle, const std::string &attr, Timer *timer) {
486  DispatchHandlerSynchronizer sync_handler;
487  Hypertable::EventPtr event_ptr;
488  CommBufPtr cbuf_ptr(Protocol::create_attr_incr_request(handle, 0, attr));
489 
490  try_again:
491  if (!wait_for_safe())
493 
494  int error = send_message(cbuf_ptr, &sync_handler, timer);
495  if (error == Error::OK) {
496  if (!sync_handler.wait_for_reply(event_ptr)) {
497  ClientHandleStatePtr handle_state;
498  String fname = "UNKNOWN";
499  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
500  fname = handle_state->normal_name.c_str();
501  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
502  "Problem incrementing attribute '%s' of hyperspace file '%s'",
503  attr.c_str(), fname.c_str());
504  }
505  else {
506  const uint8_t *decode_ptr = event_ptr->payload + 4;
507  size_t decode_remain = event_ptr->payload_len - 4;
508  uint64_t attr_val = decode_i64(&decode_ptr, &decode_remain);
509 
510  return attr_val;
511  }
512  }
513  else {
515  goto try_again;
516  }
517 
518 }
519 
520 /*
521  */
522 uint64_t Session::attr_incr(const std::string &name, const std::string &attr, Timer *timer) {
523  DispatchHandlerSynchronizer sync_handler;
524  Hypertable::EventPtr event_ptr;
525  CommBufPtr cbuf_ptr(Protocol::create_attr_incr_request(0, &name, attr));
526 
527  try_again:
528  if (!wait_for_safe())
530 
531  int error = send_message(cbuf_ptr, &sync_handler, timer);
532  if (error == Error::OK) {
533  if (!sync_handler.wait_for_reply(event_ptr)) {
534  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
535  "Problem incrementing attribute '%s' of hyperspace file '%s'",
536  attr.c_str(), name.c_str());
537  }
538  else {
539  const uint8_t *decode_ptr = event_ptr->payload + 4;
540  size_t decode_remain = event_ptr->payload_len - 4;
541  uint64_t attr_val = decode_i64(&decode_ptr, &decode_remain);
542 
543  return attr_val;
544  }
545  }
546  else {
548  goto try_again;
549  }
550 
551 }
552 
553 void
554 Session::attr_get(uint64_t handle, const std::string &attr,
555  DynamicBuffer &value, Timer *timer) {
556  DispatchHandlerSynchronizer sync_handler;
557  Hypertable::EventPtr event_ptr;
558  CommBufPtr cbuf_ptr(Protocol::create_attr_get_request(handle, 0, attr));
559 
560  try_again:
561  if (!wait_for_safe())
563 
564  int error = send_message(cbuf_ptr, &sync_handler, timer);
565  if (error == Error::OK) {
566  if (!sync_handler.wait_for_reply(event_ptr)) {
567  ClientHandleStatePtr handle_state;
568  String fname = "UNKNOWN";
569  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
570  fname = handle_state->normal_name.c_str();
571  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
572  "Problem getting attribute '%s' of hyperspace file '%s'",
573  attr.c_str(), fname.c_str());
574  }
575  else
576  decode_value(event_ptr, value);
577  }
578  else {
580  goto try_again;
581  }
582 }
583 
584 void
585 Session::attr_get(const std::string &name, const std::string &attr,
586  DynamicBuffer &value, Timer *timer) {
587  DispatchHandlerSynchronizer sync_handler;
588  Hypertable::EventPtr event_ptr;
589  CommBufPtr cbuf_ptr(Protocol::create_attr_get_request(0, &name, attr));
590 
591  try_again:
592  if (!wait_for_safe())
594 
595  int error = send_message(cbuf_ptr, &sync_handler, timer);
596  if (error == Error::OK) {
597  if (!sync_handler.wait_for_reply(event_ptr)) {
598  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
599  "Problem getting attribute '%s' of hyperspace file '%s'",
600  attr.c_str(), name.c_str());
601  }
602  else
603  decode_value(event_ptr, value);
604  }
605  else {
607  goto try_again;
608  }
609 }
610 
611 void
612 Session::attr_get(const std::string &name, const std::string &attr,
613  bool& attr_exists, DynamicBuffer &value, Timer *timer)
614 {
615  attr_exists = false;
616  try {
617  attr_get(name, attr, value, timer);
618  attr_exists = true;
619  }
620  catch (Exception &e) {
622  throw;
623  }
624 }
625 
626 void
627 Session::attrs_get(uint64_t handle, const std::vector<std::string> &attrs,
628  std::vector<DynamicBufferPtr> &values, Timer *timer) {
629  DispatchHandlerSynchronizer sync_handler;
630  Hypertable::EventPtr event_ptr;
631  CommBufPtr cbuf_ptr(Protocol::create_attrs_get_request(handle, 0, attrs));
632 
633  try_again:
634  if (!wait_for_safe())
636 
637  int error = send_message(cbuf_ptr, &sync_handler, timer);
638  if (error == Error::OK) {
639  if (!sync_handler.wait_for_reply(event_ptr)) {
640  ClientHandleStatePtr handle_state;
641  String fname = "UNKNOWN";
642  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
643  fname = handle_state->normal_name.c_str();
644  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
645  "Problem getting attributes of hyperspace file '%s'",
646  fname.c_str());
647  }
648  else
649  decode_values(event_ptr, values);
650  }
651  else {
653  goto try_again;
654  }
655 }
656 
657 void
658 Session::attrs_get(const std::string &name, const std::vector<std::string> &attrs,
659  std::vector<DynamicBufferPtr> &values, Timer *timer) {
660  DispatchHandlerSynchronizer sync_handler;
661  Hypertable::EventPtr event_ptr;
662  CommBufPtr cbuf_ptr(Protocol::create_attrs_get_request(0, &name, attrs));
663 
664  try_again:
665  if (!wait_for_safe())
667 
668  int error = send_message(cbuf_ptr, &sync_handler, timer);
669  if (error == Error::OK) {
670  if (!sync_handler.wait_for_reply(event_ptr)) {
671  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
672  "Problem getting attributes of hyperspace file '%s'",
673  name.c_str());
674  }
675  else
676  decode_values(event_ptr, values);
677  }
678  else {
680  goto try_again;
681  }
682 }
683 
684 bool
685 Session::attr_exists(uint64_t handle, const std::string& attr, Timer *timer)
686 {
687  DispatchHandlerSynchronizer sync_handler;
688  Hypertable::EventPtr event_ptr;
689 
690  CommBufPtr cbuf_ptr(Protocol::create_attr_exists_request(handle, attr));
691 
692  try_again:
693  if (!wait_for_safe())
695 
696  int error = send_message(cbuf_ptr, &sync_handler, timer);
697  if (error == Error::OK) {
698  if (!sync_handler.wait_for_reply(event_ptr)) {
699  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
700  "Hyperspace 'attr_exists' error, name=%s", attr.c_str());
701  }
702  else {
703  const uint8_t *decode_ptr = event_ptr->payload + 4;
704  size_t decode_remain = event_ptr->payload_len - 4;
705  uint8_t bval = decode_byte(&decode_ptr, &decode_remain);
706  return (bval == 0) ? false : true;
707  }
708  }
709 
711  goto try_again;
712 }
713 
714 bool
715 Session::attr_exists(const std::string& name, const std::string& attr, Timer *timer)
716 {
717  DispatchHandlerSynchronizer sync_handler;
718  Hypertable::EventPtr event_ptr;
719 
720  CommBufPtr cbuf_ptr(Protocol::create_attr_exists_request(name, attr));
721 
722  try_again:
723  if (!wait_for_safe())
725 
726  int error = send_message(cbuf_ptr, &sync_handler, timer);
727  if (error == Error::OK) {
728  if (!sync_handler.wait_for_reply(event_ptr)) {
729  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
730  "Hyperspace 'attr_exists' error, name=%s", attr.c_str());
731  }
732  else {
733  const uint8_t *decode_ptr = event_ptr->payload + 4;
734  size_t decode_remain = event_ptr->payload_len - 4;
735  uint8_t bval = decode_byte(&decode_ptr, &decode_remain);
736  return (bval == 0) ? false : true;
737  }
738  }
739 
741  goto try_again;
742 }
743 
744 /*
745  *
746  */
747 void Session::attr_del(uint64_t handle, const std::string &name, Timer *timer) {
748  DispatchHandlerSynchronizer sync_handler;
749  Hypertable::EventPtr event_ptr;
750  CommBufPtr cbuf_ptr(Protocol::create_attr_del_request(handle, name));
751 
752  try_again:
753  if (!wait_for_safe())
755 
756  int error = send_message(cbuf_ptr, &sync_handler, timer);
757  if (error == Error::OK) {
758  if (!sync_handler.wait_for_reply(event_ptr)) {
759  ClientHandleStatePtr handle_state;
760  String fname = "UNKNOWN";
761  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
762  fname = handle_state->normal_name.c_str();
763  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
764  "Problem deleting attribute '%s' of hyperspace file '%s'",
765  name.c_str(), fname.c_str());
766  }
767  }
768  else {
770  goto try_again;
771  }
772 
773 }
774 
775 void Session::attr_list(uint64_t handle, vector<String> &anames, Timer *timer) {
776  DispatchHandlerSynchronizer sync_handler;
777  Hypertable::EventPtr event_ptr;
778  CommBufPtr cbuf_ptr(Protocol::create_attr_list_request(handle));
779 
780  try_again:
781  if (!wait_for_safe())
783 
784  int error = send_message(cbuf_ptr, &sync_handler, timer);
785  if (error == Error::OK) {
786  if (!sync_handler.wait_for_reply(event_ptr)) {
787  ClientHandleStatePtr handle_state;
788  String fname = "UNKNOWN";
789  if (m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
790  fname = handle_state->normal_name.c_str();
791  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
792  "Problem getting list of attributes of hyperspace file '%s'",
793  fname.c_str());
794  }
795  else {
796  const uint8_t *decode_ptr = event_ptr->payload + 4;
797  size_t decode_remain = event_ptr->payload_len - 4;
798 
799  uint32_t num_attributes = decode_i32(&decode_ptr, &decode_remain);
800 
801  for (uint32_t k = 0; k < num_attributes; k++) {
802  String attrname;
803  attrname = decode_vstr(&decode_ptr, &decode_remain);
804  anames.push_back(attrname);
805  }
806  }
807  }
808  else {
810  goto try_again;
811  }
812 
813 }
814 
815 
816 void
817 Session::readdir(uint64_t handle, std::vector<DirEntry> &listing,
818  Timer *timer) {
819  DispatchHandlerSynchronizer sync_handler;
820  Hypertable::EventPtr event_ptr;
821  CommBufPtr cbuf_ptr(Protocol::create_readdir_request(handle));
822 
823  try_again:
824  if (!wait_for_safe())
826 
827  int error = send_message(cbuf_ptr, &sync_handler, timer);
828  if (error == Error::OK) {
829  if (!sync_handler.wait_for_reply(event_ptr)) {
830  HT_THROW((int)Protocol::response_code(event_ptr.get()),
831  "Hyperspace 'readdir' error");
832  }
833  else {
834  const uint8_t *decode_ptr = event_ptr->payload + 4;
835  size_t decode_remain = event_ptr->payload_len - 4;
836  uint32_t entry_cnt;
837  DirEntry dentry;
838  try {
839  entry_cnt = decode_i32(&decode_ptr, &decode_remain);
840  }
841  catch (Exception &e) {
843  }
844  listing.clear();
845  for (uint32_t i=0; i<entry_cnt; i++) {
846  try {
847  decode_dir_entry(&decode_ptr, &decode_remain, dentry);
848  }
849  catch (Exception &e) {
851  "Problem decoding entry %d of READDIR return packet", i);
852  }
853  listing.push_back(dentry);
854  }
855  }
856  }
857  else {
859  goto try_again;
860  }
861 }
862 
863 void
864 Session::readdir_attr(uint64_t handle, const std::string &attr, bool include_sub_entries,
865  std::vector<DirEntryAttr> &listing, Timer *timer) {
866  DispatchHandlerSynchronizer sync_handler;
867  Hypertable::EventPtr event_ptr;
868  CommBufPtr cbuf_ptr(Protocol::create_readdir_attr_request(handle, 0, attr, include_sub_entries));
869 
870  try_again:
871  if (!wait_for_safe())
873 
874  int error = send_message(cbuf_ptr, &sync_handler, timer);
875  if (error == Error::OK) {
876  if (!sync_handler.wait_for_reply(event_ptr)) {
877  HT_THROW((int)Protocol::response_code(event_ptr.get()),
878  "Hyperspace 'readdir_attr' error");
879  }
880  else
881  decode_listing(event_ptr, listing);
882  }
883  else {
885  goto try_again;
886  }
887 }
888 
889 void
890 Session::readdir_attr(const std::string &name, const std::string &attr, bool include_sub_entries,
891  std::vector<DirEntryAttr> &listing, Timer *timer) {
892  DispatchHandlerSynchronizer sync_handler;
893  Hypertable::EventPtr event_ptr;
894  CommBufPtr cbuf_ptr(Protocol::create_readdir_attr_request(0, &name, attr, include_sub_entries));
895 
896  try_again:
897  if (!wait_for_safe())
899 
900  int error = send_message(cbuf_ptr, &sync_handler, timer);
901  if (error == Error::OK) {
902  if (!sync_handler.wait_for_reply(event_ptr)) {
903  HT_THROW((int)Protocol::response_code(event_ptr.get()),
904  "Hyperspace 'readdir_attr' error");
905  }
906  else
907  decode_listing(event_ptr, listing);
908  }
909  else {
911  goto try_again;
912  }
913 }
914 
915 void
916 Session::readpath_attr(uint64_t handle, const std::string &attr,
917  std::vector<DirEntryAttr> &listing, Timer *timer) {
918  DispatchHandlerSynchronizer sync_handler;
919  Hypertable::EventPtr event_ptr;
920  CommBufPtr cbuf_ptr(Protocol::create_readpath_attr_request(handle, 0, attr));
921 
922  try_again:
923  if (!wait_for_safe())
925 
926  int error = send_message(cbuf_ptr, &sync_handler, timer);
927  if (error == Error::OK) {
928  if (!sync_handler.wait_for_reply(event_ptr)) {
929  HT_THROW((int)Protocol::response_code(event_ptr.get()),
930  "Hyperspace 'readpath_attr' error");
931  }
932  else
933  decode_listing(event_ptr, listing);
934  }
935  else {
937  goto try_again;
938  }
939 }
940 
941 void
942 Session::readpath_attr(const std::string &name, const std::string &attr,
943  std::vector<DirEntryAttr> &listing, Timer *timer) {
944  DispatchHandlerSynchronizer sync_handler;
945  Hypertable::EventPtr event_ptr;
946  CommBufPtr cbuf_ptr(Protocol::create_readpath_attr_request(0, &name, attr));
947 
948  try_again:
949  if (!wait_for_safe())
951 
952  int error = send_message(cbuf_ptr, &sync_handler, timer);
953  if (error == Error::OK) {
954  if (!sync_handler.wait_for_reply(event_ptr)) {
955  HT_THROW((int)Protocol::response_code(event_ptr.get()),
956  "Hyperspace 'readpath_attr' error");
957  }
958  else
959  decode_listing(event_ptr, listing);
960  }
961  else {
963  goto try_again;
964  }
965 }
966 
967 void
968 Session::lock(uint64_t handle, LockMode mode, LockSequencer *sequencerp,
969  Timer *timer) {
970  DispatchHandlerSynchronizer sync_handler;
971  Hypertable::EventPtr event_ptr;
972  CommBufPtr cbuf_ptr(Protocol::create_lock_request(handle, mode, false));
973  ClientHandleStatePtr handle_state;
974  uint32_t status = 0;
975 
976  if (!m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
978 
979  if (handle_state->lock_status != 0)
981 
982  try_again:
983  if (!wait_for_safe())
985 
986  {
987  lock_guard<mutex> lock(handle_state->mutex);
988  sequencerp->mode = mode;
989  sequencerp->name = handle_state->normal_name;
990  handle_state->sequencer = sequencerp;
991  }
992 
993  int error = send_message(cbuf_ptr, &sync_handler, timer);
994  if (error == Error::OK) {
995  if (!sync_handler.wait_for_reply(event_ptr))
996  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
997  "Hyperspace 'lock' error, name='%s'",
998  handle_state->normal_name.c_str());
999  else {
1000  unique_lock<mutex> lock(handle_state->mutex);
1001  const uint8_t *decode_ptr = event_ptr->payload + 4;
1002  size_t decode_remain = event_ptr->payload_len - 4;
1003  handle_state->lock_mode = mode;
1004  try {
1005  status = decode_i32(&decode_ptr, &decode_remain);
1006 
1007  if (status == LOCK_STATUS_GRANTED) {
1008  sequencerp->generation = decode_i64(&decode_ptr, &decode_remain);
1009  handle_state->lock_generation = sequencerp->generation;
1010  handle_state->lock_status = LOCK_STATUS_GRANTED;
1011  }
1012  else {
1013  assert(status == LOCK_STATUS_PENDING);
1014  handle_state->lock_status = LOCK_STATUS_PENDING;
1015  handle_state->cond.wait(lock, [&handle_state](){ return handle_state->lock_status != LOCK_STATUS_PENDING; });
1016  if (handle_state->lock_status == LOCK_STATUS_CANCELLED)
1018  assert(handle_state->lock_status == LOCK_STATUS_GRANTED);
1019  }
1020  }
1021  catch (Exception &e) {
1022  HT_THROW2(e.code(), e, "lock response decode error");
1023  }
1024  }
1025  }
1026  else {
1028  goto try_again;
1029  }
1030 
1031 }
1032 
1033 
1034 void
1035 Session::try_lock(uint64_t handle, LockMode mode, LockStatus *statusp,
1036  LockSequencer *sequencerp, Timer *timer) {
1037  DispatchHandlerSynchronizer sync_handler;
1038  Hypertable::EventPtr event_ptr;
1039  CommBufPtr cbuf_ptr(Protocol::create_lock_request(handle, mode, true));
1040  ClientHandleStatePtr handle_state;
1041 
1042  if (!m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
1044 
1045  if (handle_state->lock_status != 0)
1047 
1048  try_again:
1049  if (!wait_for_safe())
1051 
1052  int error = send_message(cbuf_ptr, &sync_handler, timer);
1053  if (error == Error::OK) {
1054  if (!sync_handler.wait_for_reply(event_ptr))
1055  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
1056  "Hyperspace 'try_lock' error, name='%s'",
1057  handle_state->normal_name.c_str());
1058  else {
1059  lock_guard<mutex> lock(handle_state->mutex);
1060  const uint8_t *decode_ptr = event_ptr->payload + 4;
1061  size_t decode_remain = event_ptr->payload_len - 4;
1062  try {
1063  *statusp = (LockStatus)decode_i32(&decode_ptr, &decode_remain);
1064 
1065  if (*statusp == LOCK_STATUS_GRANTED) {
1066  sequencerp->generation = decode_i64(&decode_ptr, &decode_remain);
1067  sequencerp->mode = mode;
1068  sequencerp->name = handle_state->normal_name;
1069  handle_state->lock_mode = mode;
1070  handle_state->lock_status = LOCK_STATUS_GRANTED;
1071  handle_state->lock_generation = sequencerp->generation;
1072  handle_state->sequencer = 0;
1073  }
1074  }
1075  catch (Exception &e) {
1076  HT_THROW2(e.code(), e, "try_lock response decode error");
1077  }
1078  }
1079  }
1080  else {
1082  goto try_again;
1083  }
1084 
1085 }
1086 
1087 
1088 void Session::release(uint64_t handle, Timer *timer) {
1089  DispatchHandlerSynchronizer sync_handler;
1090  Hypertable::EventPtr event_ptr;
1091  CommBufPtr cbuf_ptr(Protocol::create_release_request(handle));
1092  ClientHandleStatePtr handle_state;
1093 
1094  if (!m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
1096 
1097  try_again:
1098  if (!wait_for_safe())
1100 
1101  int error = send_message(cbuf_ptr, &sync_handler, timer);
1102  if (error == Error::OK) {
1103  lock_guard<mutex> lock(handle_state->mutex);
1104  if (!sync_handler.wait_for_reply(event_ptr))
1105  HT_THROW((int)Protocol::response_code(event_ptr.get()),
1106  "Hyperspace 'release' error");
1107  handle_state->lock_status = 0;
1108  handle_state->cond.notify_all();
1109  }
1110  else {
1112  goto try_again;
1113  }
1114 
1115 }
1116 
1117 
1118 
1119 void
1120 Session::get_sequencer(uint64_t handle, LockSequencer *sequencerp,
1121  Timer *timer) {
1122  ClientHandleStatePtr handle_state;
1123 
1124  if (!m_keepalive_handler_ptr->get_handle_state(handle, handle_state))
1126 
1127  if (handle_state->lock_generation == 0)
1129 
1130  sequencerp->name = handle_state->normal_name;
1131  sequencerp->mode = handle_state->lock_mode;
1132  sequencerp->generation = handle_state->lock_generation;
1133 
1134 }
1135 
1136 
1137 /*
1138  */
1140  HT_WARN("CheckSequencer not implemented.");
1141 }
1142 
1143 /*
1144  */
1146  String location;
1147 
1148  switch(type) {
1149  case LOCATE_MASTER:
1150  location = m_hyperspace_master + "\n";
1151  break;
1152  case LOCATE_REPLICAS:
1153  for (const auto &replica : m_hyperspace_replicas)
1154  location += replica + "\n";
1155  break;
1156  }
1157  return location;
1158 }
1159 
1160 /*
1161  */
1163  DispatchHandlerSynchronizer sync_handler;
1164  EventPtr event_ptr;
1165  CommBufPtr cbuf_ptr(Protocol::create_status_request());
1166  int error = send_message(cbuf_ptr, &sync_handler, timer);
1167  if (error == Error::OK) {
1168  if (!sync_handler.wait_for_reply(event_ptr))
1169  error = (int)Protocol::response_code(event_ptr);
1170  else {
1171  const uint8_t *ptr = event_ptr->payload + 4;
1172  size_t remain = event_ptr->payload_len - 4;
1173  status.decode(&ptr, &remain);
1174  error = Error::OK;
1175  }
1176  }
1177  return error;
1178 }
1179 
1180 
1182  lock_guard<mutex> lock(m_mutex);
1183  int old_state = m_state;
1184  m_state = state;
1185  if (m_state == STATE_SAFE) {
1186  m_cond.notify_all();
1187  if (old_state == STATE_JEOPARDY) {
1188  for(CallbackMap::iterator it = m_callbacks.begin(); it != m_callbacks.end(); it++)
1189  (it->second)->safe();
1190  }
1191  else if (old_state == STATE_DISCONNECTED)
1192  for(CallbackMap::iterator it = m_callbacks.begin(); it != m_callbacks.end(); it++)
1193  (it->second)->reconnected();
1194  }
1195  else if (m_state == STATE_JEOPARDY) {
1196  if (old_state == STATE_SAFE) {
1197  for(CallbackMap::iterator it = m_callbacks.begin(); it != m_callbacks.end(); it++)
1198  (it->second)->jeopardy();
1199  m_expire_time = chrono::steady_clock::now() +
1200  chrono::milliseconds(m_grace_period);
1201  }
1202  }
1203  else if (m_state == STATE_DISCONNECTED) {
1204  if (m_reconnect) {
1205  if (old_state != STATE_DISCONNECTED)
1206  for(CallbackMap::iterator it = m_callbacks.begin(); it != m_callbacks.end(); it++)
1207  (it->second)->disconnected();
1208  m_expire_time = chrono::steady_clock::now() +
1209  chrono::milliseconds(m_grace_period);
1210  }
1211  }
1212  else if (m_state == STATE_EXPIRED) {
1213  if (old_state != STATE_EXPIRED) {
1214  for(CallbackMap::iterator it = m_callbacks.begin(); it != m_callbacks.end(); it++)
1215  (it->second)->expired();
1216  }
1217  m_cond.notify_all();
1218  }
1219  return old_state;
1220 }
1221 
1222 
1224  lock_guard<mutex> lock(m_mutex);
1225  return m_state;
1226 }
1227 
1228 
1230  lock_guard<mutex> lock(m_mutex);
1231  return m_expire_time < chrono::steady_clock::now();
1232 }
1233 
1234 
1235 bool Session::wait_for_connection(uint32_t max_wait_ms) {
1236  unique_lock<mutex> lock(m_mutex);
1237  auto drop_time = chrono::steady_clock::now() + chrono::milliseconds(max_wait_ms);
1238  return m_cond.wait_until(lock, drop_time,
1239  [this]{ return m_state == STATE_SAFE; });
1240 }
1241 
1242 
1244  unique_lock<mutex> lock(m_mutex);
1245  auto drop_time = chrono::steady_clock::now() + chrono::milliseconds(timer.remaining());
1246  return m_cond.wait_until(lock, drop_time,
1247  [this]{ return m_state == STATE_SAFE; });
1248 }
1249 
1250 
1251 void Session::mkdir(const std::string &name, bool create_intermediate, const std::vector<Attribute> *init_attrs, Timer *timer) {
1252  DispatchHandlerSynchronizer sync_handler;
1253  Hypertable::EventPtr event_ptr;
1254  String normal_name;
1255 
1256  normalize_name(name, normal_name);
1257 
1258  CommBufPtr cbuf_ptr(Protocol::create_mkdir_request(normal_name, create_intermediate, init_attrs));
1259 
1260  try_again:
1261  if (!wait_for_safe())
1263 
1264  int error = send_message(cbuf_ptr, &sync_handler, timer);
1265  if (error == Error::OK) {
1266  if (!sync_handler.wait_for_reply(event_ptr))
1267  HT_THROWF((int)Protocol::response_code(event_ptr.get()),
1268  "Hyperspace 'mkdir' error, name=%s", normal_name.c_str());
1269  }
1270  else {
1272  goto try_again;
1273  }
1274 }
1275 
1277  uint32_t attr_val_len = 0;
1278  const uint8_t *decode_ptr = event_ptr->payload + 8;
1279  size_t decode_remain = event_ptr->payload_len - 8;
1280  try {
1281  void *attr_val = decode_bytes32(&decode_ptr, &decode_remain,
1282  &attr_val_len);
1283  value.clear();
1284  value.ensure(attr_val_len+1);
1285  value.add_unchecked(attr_val, attr_val_len);
1286  // nul-terminate to make caller's lives easier
1287  *value.ptr = 0;
1288  }
1289  catch (Exception &e) {
1291  }
1292 }
1293 
1294 void Session::decode_values(Hypertable::EventPtr& event_ptr, std::vector<DynamicBufferPtr> &values) {
1295  values.clear();
1296  uint32_t attr_val_len = 0;
1297  const uint8_t *decode_ptr = event_ptr->payload + 4;
1298  size_t decode_remain = event_ptr->payload_len - 4;
1299  try {
1300  uint32_t attr_val_cnt = decode_i32(&decode_ptr, &decode_remain);
1301  values.reserve(attr_val_cnt);
1302  while (attr_val_cnt-- > 0) {
1303  DynamicBufferPtr value;
1304  void *attr_val = decode_bytes32(&decode_ptr, &decode_remain,
1305  &attr_val_len);
1306  if (attr_val_len) {
1307  value = make_shared<DynamicBuffer>(attr_val_len+1);
1308  value->add_unchecked(attr_val, attr_val_len);
1309  // nul-terminate to make caller's lives easier
1310  *value->ptr = 0;
1311  }
1312  values.push_back(value);
1313  }
1314  }
1315  catch (Exception &e) {
1317  }
1318 }
1319 
1320 void Session::decode_listing(Hypertable::EventPtr& event_ptr, std::vector<DirEntryAttr> &listing) {
1321  const uint8_t *decode_ptr = event_ptr->payload + 4;
1322  size_t decode_remain = event_ptr->payload_len - 4;
1323  uint32_t entry_cnt;
1324  DirEntryAttr dentry;
1325  try {
1326  entry_cnt = decode_i32(&decode_ptr, &decode_remain);
1327  }
1328  catch (Exception &e) {
1330  }
1331  listing.clear();
1332  listing.reserve(entry_cnt);
1333  for (uint32_t ii=0; ii<entry_cnt; ii++) {
1334  try {
1335  decode_dir_entry_attr(&decode_ptr, &decode_remain, dentry);
1336  }
1337  catch (Exception &e) {
1339  "Problem decoding entry %d of READDIR_ATTR return packet", ii);
1340  }
1341  listing.push_back(dentry);
1342  }
1343 }
1344 
1346  unique_lock<mutex> lock(m_mutex);
1347  while (m_state != STATE_SAFE) {
1348  if (m_state == STATE_EXPIRED)
1349  return false;
1350  m_cond.wait(lock);
1351  }
1352  return true;
1353 }
1354 
1355 int
1357  Timer *timer) {
1358  lock_guard<mutex> lock(m_mutex);
1359  int error;
1360  uint32_t timeout_ms = timer ? (time_t)timer->remaining() : m_timeout_ms;
1361 
1362  if ((error = m_comm->send_request(m_master_addr, timeout_ms, cbuf_ptr,
1363  handler)) != Error::OK) {
1364  std::string str;
1365  if (!m_silent)
1366  HT_WARNF("Comm::send_request to htHyperspace at %s failed - %s",
1367  m_master_addr.format().c_str(), Error::get_text(error));
1368  }
1369  return error;
1370 }
1371 
1372 
1373 void Session::normalize_name(const String &name, String &normal) {
1374 
1375  if (name == "/") {
1376  normal = name;
1377  return;
1378  }
1379 
1380  normal = "";
1381  if (name[0] != '/')
1382  normal += "/";
1383 
1384  if (name.find('/', name.length()-1) == string::npos)
1385  normal += name;
1386  else
1387  normal += name.substr(0, name.length()-1);
1388 }
1389 
1391  return make_shared<HsCommandInterpreter>(this);
1392 }
1393 
1394 
1395 void Hyperspace::close_handle(SessionPtr hyperspace, uint64_t handle) {
1396  if (handle)
1397  hyperspace->close(handle);
1398 }
1399 
1400 void Hyperspace::close_handle_ptr(SessionPtr hyperspace, uint64_t *handlep) {
1401  if (*handlep)
1402  hyperspace->close(*handlep);
1403 }
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
uint16_t m_hyperspace_port
Definition: Session.h:716
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
Lock successfully granted.
Definition: LockSequencer.h:58
std::shared_ptr< HsCommandInterpreter > HsCommandInterpreterPtr
void attr_get(uint64_t handle, const std::string &attr, DynamicBuffer &value, Timer *timer=0)
Gets an extended attribute of a file.
Definition: Session.cc:554
void readdir(uint64_t handle, std::vector< DirEntry > &listing, Timer *timer=0)
Gets a directory listing.
Definition: Session.cc:817
#define HT_WARNF(msg,...)
Definition: Logger.h:290
uint64_t open(const std::string &name, uint32_t flags, HandleCallbackPtr &callback, Timer *timer=0)
Opens a file.
Definition: Session.cc:210
int get_state()
Returns current state (internal method)
Definition: Session.cc:1223
void attr_list(uint64_t handle, vector< String > &anames, Timer *timer=0)
Lists all extended attributes of a file.
Definition: Session.cc:775
A callback object derived from this class gets passed into the constructor of Hyperspace.
Definition: Session.h:94
void shutdown(Timer *timer=0)
Attempts to shutdown the Hyperspace server and destroys this session.
Definition: Session.cc:117
Holds Nagios-style program status information.
Definition: Status.h:42
Delivers sleep and wakeup notifications.
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
void close(uint64_t handle, Timer *timer=0)
Closes a file handle.
Definition: Session.cc:255
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
void lock(uint64_t handle, LockMode mode, LockSequencer *sequencerp, Timer *timer=0)
Locks a file.
Definition: Session.cc:968
virtual void handle(EventPtr &event)
Event Dispatch method.
DirEntry & decode_dir_entry(const uint8_t **bufp, size_t *remainp, DirEntry &dir_entry)
Decodes (unserializes) a directory entry from a buffer.
Definition: DirEntry.cc:43
Declarations for Protocol.
Abstract base class for application dispatch handlers registered with AsyncComm.
Error if create and file exists.
Definition: Session.h:79
HsCommandInterpreterPtr create_hs_interpreter()
Creates a new Hyperspace command interpreter.
Definition: Session.cc:1390
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
Program options handling.
void readpath_attr(uint64_t handle, const std::string &attr, std::vector< DirEntryAttr > &listing, Timer *timer=0)
Gets a listing of the value of a specified atribute for each path components of the file/dir name...
Definition: Session.cc:916
SleepWakeNotifier * m_sleep_wake_notifier
Delivers suspend/resume notifications (e.g. laptop close/open).
Definition: Session.h:731
attempting to reconnect session
Definition: Session.h:163
Lock in shared mode.
Definition: LockSequencer.h:49
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_INFO(msg)
Definition: Logger.h:271
LockStatus
Lock status.
Definition: LockSequencer.h:56
STL namespace.
std::mutex m_mutex
Definition: Session.h:709
std::shared_ptr< DynamicBuffer > DynamicBufferPtr
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
session has expired
Definition: Session.h:157
CallbackMap m_callbacks
Definition: Session.h:724
Atomically open and lock file shared, fail if can't.
Definition: Session.h:83
InetAddr m_master_addr
Definition: Session.h:722
int status(Status &status, Timer *timer=0)
Check the status of the Hyperspace master server.
Definition: Session.cc:1162
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
void close_nowait(uint64_t handle)
Attempts close a file handle, but doesn't block.
Definition: Session.cc:277
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
void handle_wakeup()
Handle wakeup event (e.g.
Definition: Session.cc:106
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
uint64_t m_last_callback_id
Definition: Session.h:725
void normalize_name(const std::string &name, std::string &normal)
Definition: Session.cc:1373
void decode_values(Hypertable::EventPtr &event_ptr, std::vector< DynamicBufferPtr > &values)
Definition: Session.cc:1294
void handle_sleep()
Handle sleep event (e.g.
Definition: Session.cc:100
Hyperspace definitions
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
uint32_t m_lease_interval
Definition: Session.h:719
uint64_t create(const std::string &name, uint32_t flags, HandleCallbackPtr &callback, const std::vector< Attribute > &init_attrs, Timer *timer=0)
Creates a file.
Definition: Session.cc:235
Definition: DirEntryAttr.h:40
void attrs_get(uint64_t handle, const std::vector< std::string > &attrs, std::vector< DynamicBufferPtr > &values, Timer *timer=0)
Gets extended attributes of a file.
Definition: Session.cc:627
vector< String > m_hyperspace_replicas
Definition: Session.h:727
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
void add_callback(SessionCallback *cb)
Register a new session callback.
Definition: Session.cc:141
void readdir_attr(uint64_t handle, const std::string &attr, bool include_sub_entries, std::vector< DirEntryAttr > &listing, Timer *timer=0)
Gets a listing of all entries in a directory which have a certain attribute .
Definition: Session.cc:864
Lock attempt was cancelled.
Definition: LockSequencer.h:64
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
void set_id(uint32_t id)
Definition: Session.h:103
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
bool exists(const std::string &name, Timer *timer=0)
Checks for the existence of a file.
Definition: Session.cc:335
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void release(uint64_t handle, Timer *timer=0)
Releases any file handle locks.
Definition: Session.cc:1088
uint32_t m_timeout_ms
Definition: Session.h:720
void attr_set(uint64_t handle, const std::string &attr, const void *value, size_t value_len, Timer *timer=0)
Sets an extended attribute of a file.
Definition: Session.cc:369
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
String locate(int type)
Returns location of Hyperspace Master/Replicas.
Definition: Session.cc:1145
Logging routines and macros.
bool attr_exists(uint64_t handle, const std::string &attr, Timer *timer=0)
Definition: Session.cc:685
void close_handle_ptr(SessionPtr hyperspace, uint64_t *handlep)
Definition: Session.cc:1400
bool wait_for_connection(uint32_t max_wait_ms)
Waits for session state to change to STATE_SAFE.
Definition: Session.cc:1235
Compatibility Macros for C/C++.
std::chrono::steady_clock::time_point m_expire_time
Definition: Session.h:721
void attr_del(uint64_t handle, const std::string &name, Timer *timer=0)
Deletes an extended attribute of a file.
Definition: Session.cc:747
void get_sequencer(uint64_t handle, LockSequencer *sequencerp, Timer *timer=0)
Gets the lock sequencer of a locked file or directory handle.
Definition: Session.cc:1120
PropertiesPtr m_cfg
Definition: Session.h:712
uint32_t mode
lock mode (LOCK_MODE_SHARED or LOCK_MODE_EXCLUSIVE)
Definition: LockSequencer.h:40
#define HT_END
Definition: Logger.h:220
Functions to serialize/deserialize primitives to/from a memory buffer.
uint64_t attr_incr(uint64_t handle, const std::string &attr, Timer *timer=0)
Atomically increments the attribute and returns pre-incremented value Attribute is assumed to be a ui...
Definition: Session.cc:485
void close_handle(SessionPtr hyperspace, uint64_t handle)
Definition: Session.cc:1395
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
Definition: DirEntry.h:34
ClientKeepaliveHandlerPtr m_keepalive_handler_ptr
Definition: Session.h:723
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
std::shared_ptr< HandleCallback > HandleCallbackPtr
std::mutex m_callback_mutex
Definition: Session.h:726
Hypertable definitions
Lock exclusive mode.
Definition: LockSequencer.h:51
void try_lock(uint64_t handle, LockMode mode, LockStatus *statusp, LockSequencer *sequencerp, Timer *timer=0)
Attempts to lock a file.
Definition: Session.cc:1035
void unlink(const std::string &name, Timer *timer=0)
Removes a file or directory.
Definition: Session.cc:308
bool expired()
Checks for session expiration (internal method)
Definition: Session.cc:1229
DispatchHandler class used to synchronize with response messages.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Entry point to AsyncComm service.
Definition: Comm.h:61
LockMode
Lock mode.
Definition: LockSequencer.h:47
String m_hyperspace_master
Definition: Session.h:728
void clear()
Clears the buffer.
Declarations for ConnectionManager.
std::string name
Pathname of file that is locked.
Definition: LockSequencer.h:38
int state_transition(int state)
Transions state (internal method)
Definition: Session.cc:1181
Declarations for Comm.
Lock attempt pending (internal use only)
Definition: LockSequencer.h:62
void update_master_addr(const String &host)
Definition: Session.cc:92
atomically open and lock file exclusive, fail if can't
Definition: Session.h:85
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
DirEntryAttr & decode_dir_entry_attr(const uint8_t **bufp, size_t *remainp, DirEntryAttr &entry)
Decodes (unserializes) a directory entry from a buffer.
Definition: DirEntryAttr.cc:53
virtual ~Session()
Definition: Session.cc:86
Declaration for SleepWakeNotifier.
Internet address wrapper classes and utility functions.
uint8_t * decode_bytes32(const uint8_t **bufp, size_t *remainp, uint32_t *lenp)
Decodes a variable sized byte array from the given buffer.
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
This is a generic exception class for Hypertable.
Definition: Error.h:314
void mkdir(const std::string &name, Timer *timer=0)
Creates a directory.
Definition: Session.cc:295
void decode_listing(Hypertable::EventPtr &event_ptr, std::vector< DirEntryAttr > &listing)
Definition: Session.cc:1320
bool remove_callback(SessionCallback *cb)
De-register session callback.
Definition: Session.cc:149
session is in jeopardy
Definition: Session.h:159
std::shared_ptr< ClientHandleState > ClientHandleStatePtr
Create file if it does not exist.
Definition: Session.h:77
int send_message(CommBufPtr &, DispatchHandler *, Timer *timer)
Definition: Session.cc:1356
void decode_value(Hypertable::EventPtr &event_ptr, DynamicBuffer &value)
Definition: Session.cc:1276
#define HT_TRY(_s_, _code_)
Definition: Error.h:517
#define HT_WARN(msg)
Definition: Logger.h:289
uint8_t decode_byte(const uint8_t **bufp, size_t *remainp)
Decodes a single byte from the given buffer.
Definition: Serialization.h:73
void mkdirs(const std::string &name, Timer *timer=0)
Creates a directory, including all intermediate paths.
Definition: Session.cc:299
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
int send_request(const CommAddress &addr, uint32_t timeout_ms, CommBufPtr &cbuf, DispatchHandler *response_handler)
Sends a request message over a connection, expecting a response.
Definition: Comm.cc:300
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
std::condition_variable m_cond
Definition: Session.h:710
uint32_t m_grace_period
Definition: Session.h:718
#define HT_DEBUG_OUT
Definition: Logger.h:261
Declarations for DispatchHandlerSynchronizer.
int code() const
Returns the error code.
Definition: Error.h:391
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484
void check_sequencer(LockSequencer &sequencer, Timer *timer=0)
Checks to see if a lock sequencer is valid.
Definition: Session.cc:1139