0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Client.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "Client.h"
32 #include "HyperspaceCallback.h"
33 #include "NamespaceFlag.h"
34 #include "Protocol.h"
37 #include "Request/Parameters/Compact.h"
41 #include "Request/Parameters/DropTable.h"
51 #include "Request/Parameters/SetState.h"
53 #include "Response/Parameters/Status.h"
55 
56 #include <Hyperspace/Session.h>
57 
60 #include <AsyncComm/Protocol.h>
61 
62 #include <Common/Config.h>
63 #include <Common/Error.h>
64 #include <Common/InetAddr.h>
65 #include <Common/Random.h>
66 #include <Common/Serialization.h>
67 #include <Common/Time.h>
68 #include <Common/Timer.h>
69 
70 #include <chrono>
71 #include <thread>
72 
73 using namespace Hypertable;
74 using namespace Hypertable::Lib;
75 using namespace Serialization;
76 using namespace std;
77 
79  Hyperspace::SessionPtr &hyperspace,
80  const String &toplevel_dir, uint32_t timeout_ms,
83  : m_conn_manager(conn_mgr), m_hyperspace(hyperspace), m_app_queue(app_queue),
84  m_dispatcher_handler(dhp), m_connection_initializer(init),
85  m_timeout_ms(timeout_ms), m_toplevel_dir(toplevel_dir) {
86 
87  m_comm = m_conn_manager->get_comm();
88  memset(&m_master_addr, 0, sizeof(m_master_addr));
89 
90  m_retry_interval = Config::properties->get_i32("Hypertable.Connection.Retry.Interval");
91  m_verbose = Config::get_bool("verbose");
92 
96  m_master_file_callback = make_shared<HyperspaceCallback>(this, m_app_queue);
97 
98  // register hyperspace session callback
101 
102  m_hyperspace_connected = true;
104  reload_master();
105 }
106 
108  uint32_t timeout_ms)
109  : m_conn_manager(conn_mgr), m_master_addr(addr), m_timeout_ms(timeout_ms) {
110  m_comm = m_conn_manager->get_comm();
111  m_retry_interval = Config::properties->get_i32("Hypertable.Connection.Retry.Interval");
112  m_verbose = Config::get_bool("verbose");
113  m_conn_manager->add_with_initializer(m_master_addr, m_retry_interval, "Master",
116 }
117 
118 
119 Master::Client::Client(Comm *comm, InetAddr &addr, uint32_t timeout_ms)
120  : m_comm(comm), m_master_addr(addr), m_timeout_ms(timeout_ms) {
121  m_retry_interval = Config::properties->get_i32("Hypertable.Connection.Retry.Interval");
122  m_verbose = Config::get_bool("verbose");
123 }
124 
125 
127  if (m_hyperspace) {
128  m_hyperspace->remove_callback(&m_hyperspace_session_callback);
129  if (m_master_file_handle != 0)
130  m_hyperspace->close_nowait(m_master_file_handle);
131  }
132 }
133 
134 
135 void Master::Client::hyperspace_disconnected()
136 {
137  lock_guard<mutex> lock(m_hyperspace_mutex);
138  HT_DEBUG_OUT << "Hyperspace disconnected" << HT_END;
139  m_hyperspace_init = false;
140  m_hyperspace_connected = false;
141 }
142 
143 void Master::Client::hyperspace_reconnected()
144 {
145  {
146  lock_guard<mutex> lock(m_hyperspace_mutex);
147  HT_DEBUG_OUT << "Hyperspace reconnected" << HT_END;
148  HT_ASSERT(!m_hyperspace_init);
149  m_hyperspace_connected = true;
150  }
151 
152  EventPtr nullevent;
153  m_app_queue->add(new EventHandlerMasterChange(this, nullevent));
154 }
155 
159 void Master::Client::initialize_hyperspace() {
160 
161  if (m_hyperspace_init)
162  return;
163  HT_ASSERT(m_hyperspace_connected);
164 
165  Timer timer(m_timeout_ms, true);
166  while (timer.remaining()) {
167  try {
168  m_master_file_handle =
169  m_hyperspace->open(m_toplevel_dir + "/master",
171  m_master_file_callback, &timer);
172  break;
173  }
174  catch (Exception &e) {
176  throw;
177  this_thread::sleep_for(chrono::milliseconds(3000));
178  }
179  }
180  if (m_master_file_handle == 0)
181  HT_THROW(Error::REQUEST_TIMEOUT, "Opening hyperspace master file");
182  m_hyperspace_init = true;
183 }
184 
185 void Master::Client::initialize(Timer *&timer, Timer &tmp_timer) {
186  if (timer == 0)
187  timer = &tmp_timer;
188  timer->start();
189 }
190 
191 
192 void
193 Master::Client::create_namespace(const String &name, int32_t flags, Timer *timer) {
194  Timer tmp_timer(m_timeout_ms);
195  CommBufPtr cbp;
196  EventPtr event;
197  int64_t id = 0;
198  String label = format("create_namespace('%s', flags=%s)", name.c_str(),
199  NamespaceFlag::to_str(flags).c_str());
200 
201  initialize(timer, tmp_timer);
202 
203  while (!timer->expired()) {
204 
205  {
207  Request::Parameters::CreateNamespace params(name, flags);
208  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
209  params.encode(cbuf->get_data_ptr_address());
210  if (!send_message(cbuf, timer, event, label))
211  continue;
212  }
213 
214  const uint8_t *ptr = event->payload + 4;
215  size_t remain = event->payload_len - 4;
216  id = decode_i64(&ptr, &remain);
217 
218  fetch_result(id, timer, event, label);
219  return;
220  }
221 
222  {
223  lock_guard<mutex> lock(m_mutex);
225  "Client operation %s to master %s failed", label.c_str(),
226  m_master_addr.format().c_str());
227  }
228 
229 }
230 
231 
232 void
233 Master::Client::drop_namespace(const String &name, int32_t flags, Timer *timer) {
234  Timer tmp_timer(m_timeout_ms);
235  EventPtr event;
236  int64_t id = 0;
237  String label = format("drop_namespace('%s', flags=%s)",
238  name.c_str(), NamespaceFlag::to_str(flags).c_str());
239 
240  initialize(timer, tmp_timer);
241 
242  while (!timer->expired()) {
243 
244  {
246  Request::Parameters::DropNamespace params(name, flags);
247  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
248  params.encode(cbuf->get_data_ptr_address());
249  if (!send_message(cbuf, timer, event, label))
250  continue;
251  }
252 
253  const uint8_t *ptr = event->payload + 4;
254  size_t remain = event->payload_len - 4;
255  id = decode_i64(&ptr, &remain);
256  fetch_result(id, timer, event, label);
257  return;
258  }
259 
260  {
261  lock_guard<mutex> lock(m_mutex);
263  "Client operation %s to master %s failed", label.c_str(),
264  m_master_addr.format().c_str());
265  }
266 
267 }
268 
269 
270 void Master::Client::compact(const String &tablename, const String &row,
271  int32_t range_types, Timer *timer) {
272  Timer tmp_timer(m_timeout_ms);
273  EventPtr event;
274  String label = format("compact('%s')", tablename.c_str());
275 
276  initialize(timer, tmp_timer);
277 
278  while (!timer->expired()) {
279 
280  {
282  Request::Parameters::Compact params(tablename, row, range_types);
283  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
284  params.encode(cbuf->get_data_ptr_address());
285  if (!send_message(cbuf, timer, event, label))
286  continue;
287  }
288 
289  const uint8_t *ptr = event->payload + 4;
290  size_t remain = event->payload_len - 4;
291  int64_t id = decode_i64(&ptr, &remain);
292  fetch_result(id, timer, event, label);
293  return;
294  }
295 
296  {
297  lock_guard<mutex> lock(m_mutex);
299  "Client operation %s to master %s failed", label.c_str(),
300  m_master_addr.format().c_str());
301  }
302 }
303 
304 
305 void
306 Master::Client::create_table(const String &name, const String &schema,
307  Timer *timer) {
308  Timer tmp_timer(m_timeout_ms);
309  EventPtr event;
310  int64_t id = 0;
311  String label = format("create_table('%s')", name.c_str());
312 
313  initialize(timer, tmp_timer);
314 
315  while (!timer->expired()) {
316 
317  {
319  Request::Parameters::CreateTable params(name, schema);
320  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
321  params.encode(cbuf->get_data_ptr_address());
322  if (!send_message(cbuf, timer, event, label))
323  continue;
324  }
325 
326  const uint8_t *ptr = event->payload + 4;
327  size_t remain = event->payload_len - 4;
328  id = decode_i64(&ptr, &remain);
329  fetch_result(id, timer, event, label);
330  return;
331  }
332 
333  {
334  lock_guard<mutex> lock(m_mutex);
336  "Client operation %s to master %s failed", label.c_str(),
337  m_master_addr.format().c_str());
338  }
339 }
340 
341 
342 void
343 Master::Client::alter_table(const String &name, const String &schema,
344  bool force, Timer *timer) {
345  Timer tmp_timer(m_timeout_ms);
346  EventPtr event;
347  int64_t id = 0;
348  String label = format("alter_table('%s')", name.c_str());
349 
350  initialize(timer, tmp_timer);
351 
352  while (!timer->expired()) {
353 
354  {
356  Request::Parameters::AlterTable params(name, schema, force);
357  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
358  params.encode(cbuf->get_data_ptr_address());
359  if (!send_message(cbuf, timer, event, label))
360  continue;
361  }
362  const uint8_t *ptr = event->payload + 4;
363  size_t remain = event->payload_len - 4;
364  id = decode_i64(&ptr, &remain);
365  fetch_result(id, timer, event, label);
366  return;
367  }
368 
369  {
370  lock_guard<mutex> lock(m_mutex);
372  "Client operation %s to master %s failed", label.c_str(),
373  m_master_addr.format().c_str());
374  }
375 }
376 
378  Timer tmp_timer(m_timeout_ms);
379  EventPtr event;
380 
381  initialize(timer, tmp_timer);
382 
383  while (!timer->expired()) {
384 
386  CommBufPtr cbuf( new CommBuf(header, 0) );
387  if (!send_message(cbuf, timer, event, "status"))
388  continue;
389 
390  const uint8_t *ptr = event->payload + 4;
391  size_t remain = event->payload_len - 4;
393  params.decode(&ptr, &remain);
394  status = params.status();
395  return;
396  }
397 
398  lock_guard<mutex> lock(m_mutex);
400  "Client operation 'status' to master %s failed",
401  m_master_addr.format().c_str());
402 }
403 
404 
405 void
406 Master::Client::move_range(const String &source, int64_t range_id,
407  TableIdentifier &table,
408  RangeSpec &range, const String &transfer_log,
409  uint64_t soft_limit, bool split, Timer *timer) {
410  Timer tmp_timer(m_timeout_ms);
411  EventPtr event;
412  String label =
413  format("move_range(%s[%s..%s] (%lld), transfer_log='%s', soft_limit=%llu)",
414  table.id, range.start_row, range.end_row, (Lld)range_id,
415  transfer_log.c_str(), (Llu)soft_limit);
416 
417  initialize(timer, tmp_timer);
418 
419  try {
420  while (!timer->expired()) {
421 
422  {
423  CommHeader header(Protocol::COMMAND_MOVE_RANGE);
424  Request::Parameters::MoveRange params(source, range_id, table, range,
425  transfer_log, soft_limit, split);
426  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
427  params.encode(cbuf->get_data_ptr_address());
428  if (!send_message(cbuf, timer, event, label))
429  continue;
430  }
431  return;
432  }
433  {
434  lock_guard<mutex> lock(m_mutex);
436  "Client operation %s to master %s failed", label.c_str(),
437  m_master_addr.format().c_str());
438  }
439  }
440  catch (Exception &e) {
442  return;
443  HT_THROW2(e.code(), e, label);
444  }
445 }
446 
447 
448 void
449 Master::Client::relinquish_acknowledge(const String &source, int64_t range_id,
450  TableIdentifier &table,
451  RangeSpec &range, Timer *timer) {
452  Timer tmp_timer(m_timeout_ms);
453  EventPtr event;
454  int64_t id = 0;
455  String label = format("relinquish_acknowledge(%s[%s..%s] id=%lld)",
456  table.id, range.start_row, range.end_row,(Lld)range_id);
457 
458  initialize(timer, tmp_timer);
459 
460  while (!timer->expired()) {
461 
462  {
463  CommHeader header(Protocol::COMMAND_RELINQUISH_ACKNOWLEDGE);
464  Request::Parameters::RelinquishAcknowledge params(source, range_id, table, range);
465  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
466  params.encode(cbuf->get_data_ptr_address());
467  if (!send_message(cbuf, timer, event, label))
468  continue;
469  }
470 
471  const uint8_t *ptr = event->payload + 4;
472  size_t remain = event->payload_len - 4;
473  id = decode_i64(&ptr, &remain);
474  fetch_result(id, timer, event, label);
475  return;
476  }
477 
478  {
479  lock_guard<mutex> lock(m_mutex);
481  "Client operation %s to master %s failed", label.c_str(),
482  m_master_addr.format().c_str());
483  }
484 
485 }
486 
487 
488 
489 void
490 Master::Client::rename_table(const String &from, const String &to, Timer *timer) {
491  Timer tmp_timer(m_timeout_ms);
492  EventPtr event;
493  int64_t id = 0;
494  String label = format("rename_table(old='%s', new='%s')", from.c_str(), to.c_str());
495 
496  initialize(timer, tmp_timer);
497 
498  while (!timer->expired()) {
499 
500  {
502  Request::Parameters::RenameTable params(from, to);
503  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
504  params.encode(cbuf->get_data_ptr_address());
505  if (!send_message(cbuf, timer, event, label))
506  continue;
507  }
508 
509  const uint8_t *ptr = event->payload + 4;
510  size_t remain = event->payload_len - 4;
511  id = decode_i64(&ptr, &remain);
512  fetch_result(id, timer, event, label);
513  return;
514  }
515 
516  {
517  lock_guard<mutex> lock(m_mutex);
519  "Client operation %s to master %s failed", label.c_str(),
520  m_master_addr.format().c_str());
521  }
522 }
523 
524 
525 void
526 Master::Client::drop_table(const String &name, bool if_exists, Timer *timer) {
527  Timer tmp_timer(m_timeout_ms);
528  EventPtr event;
529  String label = format("drop_table('%s', if_exists=%s)",
530  name.c_str(), if_exists ? "true" : "false");
531 
532  initialize(timer, tmp_timer);
533 
534  while (!timer->expired()) {
535 
536  {
538  Request::Parameters::DropTable params(name, if_exists);
539  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
540  params.encode(cbuf->get_data_ptr_address());
541  if (!send_message(cbuf, timer, event, label))
542  continue;
543  }
544  const uint8_t *ptr = event->payload + 4;
545  size_t remain = event->payload_len - 4;
546  int64_t id = decode_i64(&ptr, &remain);
547  fetch_result(id, timer, event, label);
548  return;
549  }
550 
551  {
552  lock_guard<mutex> lock(m_mutex);
554  "Client operation %s to master %s failed", label.c_str(),
555  m_master_addr.format().c_str());
556  }
557 
558 }
559 
560 void Master::Client::recreate_index_tables(const std::string &name,
561  TableParts parts, Timer *timer) {
562  Timer tmp_timer(m_timeout_ms);
563  EventPtr event;
564  int64_t id = 0;
565  String label = format("recreate_index_tables('%s', part=%s)",
566  name.c_str(), parts.to_string().c_str());
567 
568  initialize(timer, tmp_timer);
569 
570  while (!timer->expired()) {
571 
572  {
573  CommHeader header(Protocol::COMMAND_RECREATE_INDEX_TABLES);
574  Request::Parameters::RecreateIndexTables params(name, parts);
575  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
576  params.encode(cbuf->get_data_ptr_address());
577  if (!send_message(cbuf, timer, event, label))
578  continue;
579  }
580 
581  const uint8_t *ptr = event->payload + 4;
582  size_t remain = event->payload_len - 4;
583  id = decode_i64(&ptr, &remain);
584  fetch_result(id, timer, event, label);
585  return;
586  }
587 
588  {
589  lock_guard<mutex> lock(m_mutex);
591  "Client operation %s to master %s failed", label.c_str(),
592  m_master_addr.format().c_str());
593  }
594 }
595 
596 
597 
599  Timer tmp_timer(m_timeout_ms);
600  DispatchHandlerSynchronizer sync_handler;
601  EventPtr event;
602 
603  initialize(timer, tmp_timer);
604 
606  CommBufPtr cbuf( new CommBuf(header, 0) );
607  send_message_async(cbuf, &sync_handler, timer, "shutdown");
608 
609  if (!sync_handler.wait_for_reply(event)) {
610  int32_t error = Hypertable::Protocol::response_code(event);
611  if (error != Error::COMM_BROKEN_CONNECTION)
612  HT_THROW(error, "Master 'shutdown' error");
613  }
614 }
615 
616 
617 void Master::Client::balance(BalancePlan &plan, Timer *timer) {
618  Timer tmp_timer(m_timeout_ms);
619  EventPtr event;
620  int64_t id = 0;
621  String label = "balance";
622 
623  initialize(timer, tmp_timer);
624 
625  try {
626 
627  while (!timer->expired()) {
628 
629  {
631  Request::Parameters::Balance params(plan);
632  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
633  params.encode(cbuf->get_data_ptr_address());
634  if (!send_message(cbuf, timer, event, label))
635  continue;
636  }
637  const uint8_t *ptr = event->payload + 4;
638  size_t remain = event->payload_len - 4;
639  id = decode_i64(&ptr, &remain);
640  break;
641  }
642 
643  if (timer->expired()) {
644  lock_guard<mutex> lock(m_mutex);
646  "Client operation %s to master %s failed", label.c_str(),
647  m_master_addr.format().c_str());
648  }
649 
650  }
651  catch (Exception &e) {
653  HT_THROW2(e.code(), e, label);
654  }
655 
656  fetch_result(id, timer, event, label);
657 
658 }
659 
660 
661 void Master::Client::set_state(const std::vector<SystemVariable::Spec> &specs,
662  Timer *timer) {
663  Timer tmp_timer(m_timeout_ms);
664  EventPtr event;
665  int64_t id = 0;
666  String label = "set";
667 
668  initialize(timer, tmp_timer);
669 
670  while (!timer->expired()) {
671  {
672  CommHeader header(Protocol::COMMAND_SET_STATE);
673  Request::Parameters::SetState params(specs);
674  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
675  params.encode(cbuf->get_data_ptr_address());
676  if (!send_message(cbuf, timer, event, label))
677  continue;
678  }
679  const uint8_t *ptr = event->payload + 4;
680  size_t remain = event->payload_len - 4;
681  id = decode_i64(&ptr, &remain);
682  fetch_result(id, timer, event, label);
683  return;
684  }
685 
686  {
687  lock_guard<mutex> lock(m_mutex);
689  "Client operation %s to master %s failed", label.c_str(),
690  m_master_addr.format().c_str());
691  }
692 }
693 
694 
695 void Master::Client::stop(const String &rsname, Timer *timer) {
696  Timer tmp_timer(m_timeout_ms);
697  EventPtr event;
698  int64_t id {};
699  String label = "stop";
700 
701  initialize(timer, tmp_timer);
702 
703  try {
704  while (!timer->expired()) {
706  Request::Parameters::Stop params(rsname);
707  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
708  params.encode(cbuf->get_data_ptr_address());
709  if (!send_message(cbuf, timer, event, label))
710  continue;
711  const uint8_t *ptr = event->payload + 4;
712  size_t remain = event->payload_len - 4;
713  id = decode_i64(&ptr, &remain);
714  break;
715  }
716 
717  if (timer->expired()) {
718  lock_guard<mutex> lock(m_mutex);
720  "Client operation %s to master %s failed", label.c_str(),
721  m_master_addr.format().c_str());
722  }
723  }
724  catch (Exception &e) {
726  HT_THROW2(e.code(), e, label);
727  }
728 
729  fetch_result(id, timer, event, label);
730 }
731 
732 
733 void Master::Client::system_status(Status &status, Timer *timer) {
734  Timer tmp_timer(m_timeout_ms);
735  EventPtr event;
736 
737  initialize(timer, tmp_timer);
738 
739  while (!timer->expired()) {
740 
741  CommHeader header(Protocol::COMMAND_SYSTEM_STATUS);
742  CommBufPtr cbuf( new CommBuf(header, 0) );
743  if (!send_message(cbuf, timer, event, "system status"))
744  continue;
745 
746  const uint8_t *ptr = event->payload + 4;
747  size_t remain = event->payload_len - 4;
749  params.decode(&ptr, &remain);
750  status = params.status();
751  return;
752  }
753 
754  lock_guard<mutex> lock(m_mutex);
756  "Client operation 'system status' to master %s failed",
757  m_master_addr.format().c_str());
758 }
759 
760 
761 void
762 Master::Client::send_message_async(CommBufPtr &cbp, DispatchHandler *handler,
763  Timer *timer, const String &label) {
764  unique_lock<mutex> lock(m_mutex);
765  DispatchHandlerSynchronizer *sync_handler
766  = dynamic_cast<DispatchHandlerSynchronizer *>(handler);
767  int error;
768 
769  while ((error = m_comm->send_request(m_master_addr, timer->remaining(), cbp,
770  handler)) != Error::OK) {
771  // COMM_BROKEN_CONNECTION implies handler will be called back, if handler
772  // is a DispatchHandlerSynchronizer, wait for callback and try again
773  if (error == Error::COMM_BROKEN_CONNECTION) {
774  if (sync_handler) {
775  EventPtr event;
776  sync_handler->wait_for_reply(event);
777  }
778  else
779  return;
780  }
781 
782  auto expire_time = chrono::system_clock::now() +
783  chrono::milliseconds(std::min(timer->remaining(),
784  Random::number32(m_retry_interval)));
785 
786  if (m_cond.wait_until(lock, expire_time) == cv_status::timeout) {
787  if (timer->expired())
789  "Client operation %s to master %s failed", label.c_str(),
790  m_master_addr.format().c_str());
791  }
792  }
793 }
794 
795 bool
796 Master::Client::send_message(CommBufPtr &cbp, Timer *timer, EventPtr &event, const String &label) {
797  DispatchHandlerSynchronizer sync_handler;
798 
799  send_message_async(cbp, &sync_handler, timer, label);
800 
801  if (!sync_handler.wait_for_reply(event)) {
802  int32_t error = Hypertable::Protocol::response_code(event);
803  if (error == Error::COMM_NOT_CONNECTED ||
805  error == Error::COMM_CONNECT_ERROR ||
806  error == Error::SERVER_NOT_READY) {
807  this_thread::sleep_for(chrono::milliseconds(std::min(timer->remaining(), Random::number32(3000))));
808  return false;
809  }
810  HT_THROWF(error, "Client operation %s failed", label.c_str());
811  }
812  return true;
813 }
814 
815 void Master::Client::fetch_result(int64_t id, Timer *timer, EventPtr &event, const String &label) {
816 
817  while (!timer->expired()) {
818 
819  CommHeader header(Protocol::COMMAND_FETCH_RESULT);
821  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
822  params.encode(cbuf->get_data_ptr_address());
823  if (!send_message(cbuf, timer, event, label))
824  continue;
825  return;
826  }
827 
828  {
829  lock_guard<mutex> lock(m_mutex);
831  "Failed to fetch ID %lld for Client operation %s to master %s",
832  (Lld)id, label.c_str(), m_master_addr.format().c_str());
833  }
834 }
835 
836 void
837 Master::Client::replay_status(int64_t op_id, const String &location,
838  int32_t plan_generation) {
839  Timer timer(m_timeout_ms, true);
840  EventPtr event;
841  String label = format("replay_status op_id=%llu location=%s "
842  "plan_generation=%d", (Llu)op_id,
843  location.c_str(), plan_generation);
844 
845  while (!timer.expired()) {
846 
847  {
848  CommHeader header(Protocol::COMMAND_REPLAY_STATUS);
849  Request::Parameters::ReplayStatus params(op_id, location, plan_generation);
850  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
851  params.encode(cbuf->get_data_ptr_address());
852  if (!send_message(cbuf, &timer, event, label)) {
853  this_thread::sleep_for(chrono::milliseconds(std::min(timer.remaining(), Random::number32(3000))));
854  continue;
855  }
856  }
857  return;
858  }
859 
860  {
861  lock_guard<mutex> lock(m_mutex);
863  "Client operation %s to master %s failed", label.c_str(),
864  m_master_addr.format().c_str());
865  }
866 }
867 
868 
869 void
870 Master::Client::replay_complete(int64_t op_id, const String &location,
871  int32_t plan_generation, int32_t error,
872  const String message) {
873  Timer timer(m_timeout_ms, true);
874  EventPtr event;
875  String label = format("replay_complete op_id=%llu location=%s "
876  "plan_generation=%d error=%s", (Llu)op_id,
877  location.c_str(), plan_generation,
878  Error::get_text(error));
879 
880  while (!timer.expired()) {
881 
882  {
883  CommHeader header(Protocol::COMMAND_REPLAY_COMPLETE);
884  Request::Parameters::ReplayComplete params(op_id, location,
885  plan_generation,
886  error, message);
887  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
888  params.encode(cbuf->get_data_ptr_address());
889  if (!send_message(cbuf, &timer, event, label)) {
890  this_thread::sleep_for(chrono::milliseconds(std::min(timer.remaining(), Random::number32(3000))));
891  continue;
892  }
893  }
894  return;
895  }
896 
897  {
898  lock_guard<mutex> lock(m_mutex);
900  "Client operation %s to master %s failed", label.c_str(),
901  m_master_addr.format().c_str());
902  }
903 }
904 
905 void
906 Master::Client::phantom_prepare_complete(int64_t op_id, const String &location,
907  int plan_generation, int32_t error,
908  const String message) {
909  Timer timer(m_timeout_ms, true);
910  EventPtr event;
911  String label = format("phantom_prepare_complete op_id=%llu location=%s "
912  "plan_generation=%d error=%s", (Llu)op_id,
913  location.c_str(), plan_generation,
914  Error::get_text(error));
915 
916  while (!timer.expired()) {
917  CommHeader header(Protocol::COMMAND_PHANTOM_PREPARE_COMPLETE);
918  Request::Parameters::PhantomPrepareComplete params(op_id, location,
919  plan_generation, error,
920  message);
921  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
922  params.encode(cbuf->get_data_ptr_address());
923  if (!send_message(cbuf, &timer, event, label)) {
924  this_thread::sleep_for(chrono::milliseconds(std::min(timer.remaining(), Random::number32(3000))));
925  continue;
926  }
927  return;
928  }
929 
930  {
931  lock_guard<mutex> lock(m_mutex);
933  "Client operation %s to master %s failed", label.c_str(),
934  m_master_addr.format().c_str());
935  }
936 }
937 
938 void
939 Master::Client::phantom_commit_complete(int64_t op_id, const String &location,
940  int plan_generation, int32_t error,
941  const String message) {
942  Timer timer(m_timeout_ms, true);
943  EventPtr event;
944  String label = format("phantom_commit_complete op_id=%llu location=%s "
945  "error=%s", (Llu)op_id, location.c_str(), Error::get_text(error));
946 
947  while (!timer.expired()) {
948  CommHeader header(Protocol::COMMAND_PHANTOM_COMMIT_COMPLETE);
949  Request::Parameters::PhantomCommitComplete params(op_id, location,
950  plan_generation, error,
951  message);
952  CommBufPtr cbuf( new CommBuf(header, params.encoded_length()) );
953  params.encode(cbuf->get_data_ptr_address());
954  if (!send_message(cbuf, &timer, event, label)) {
955  this_thread::sleep_for(chrono::milliseconds(std::min(timer.remaining(), Random::number32(3000))));
956  continue;
957  }
958  return;
959  }
960 
961  {
962  lock_guard<mutex> lock(m_mutex);
964  "Client operation %s to master %s failed", label.c_str(),
965  m_master_addr.format().c_str());
966  }
967 }
968 
969 void Master::Client::reload_master() {
970  InetAddr master_addr;
971 
972  {
973  unique_lock<mutex> lock(m_mutex);
974  int error;
975  DynamicBuffer value(0);
976  String addr_str;
977 
978  {
979  lock_guard<mutex> lock(m_hyperspace_mutex);
980  if (m_hyperspace_init) {
981  try {
982  m_hyperspace->attr_get(m_master_file_handle, "address", value);
983  }
984  catch (Exception &e) {
985  HT_WARN("Unable to determine master address from Hyperspace");
986  return;
987  }
988  }
989  else if (m_hyperspace_connected) {
990  initialize_hyperspace();
991  try {
992  m_hyperspace->attr_get(m_master_file_handle, "address", value);
993  }
994  catch (Exception &e) {
995  HT_WARN("Unable to determine master address from Hyperspace");
996  return;
997  }
998  }
999  else
1000  HT_THROW(Error::CONNECT_ERROR_HYPERSPACE, "Client not connected to Hyperspace");
1001  }
1002  addr_str = (const char *)value.base;
1003 
1004  if (addr_str != m_master_addr_string) {
1005 
1006  if (m_master_addr.sin_port != 0) {
1007  if ((error = m_conn_manager->remove(m_master_addr)) != Error::OK) {
1008  if (m_verbose)
1009  HT_WARNF("Problem removing connection to Master - %s",
1010  Error::get_text(error));
1011  }
1012  if (m_verbose)
1013  HT_INFOF("Connecting to new Master (old=%s, new=%s)",
1014  m_master_addr_string.c_str(), addr_str.c_str());
1015  }
1016 
1017  m_master_addr_string = addr_str;
1018 
1019  InetAddr::initialize(&m_master_addr, m_master_addr_string.c_str());
1020 
1021  // If the new master is not yet fully initialized, the connect() can
1022  // fail. In this case the clients can run into a timeout before the
1023  // master attempts to re-connect. To avoid that, the retry interval is
1024  // cut in half.
1025  m_conn_manager->add_with_initializer(m_master_addr, m_retry_interval / 2,
1026  "Master", m_dispatcher_handler, m_connection_initializer);
1027  }
1028  master_addr = m_master_addr;
1029  }
1030 
1031  {
1032  Timer timer(m_retry_interval, true);
1033  if (m_conn_manager->wait_for_connection(master_addr, timer))
1034  m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1035  m_cond.notify_all();
1036  }
1037 
1038 }
1039 
1040 
1041 bool Master::Client::wait_for_connection(uint32_t max_wait_ms) {
1042  Timer timer(max_wait_ms, true);
1043  m_conn_manager->wait_for_connection(m_master_addr, timer);
1044  return m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1045 }
1046 
1047 bool Master::Client::wait_for_connection(Timer &timer) {
1048  m_conn_manager->wait_for_connection(m_master_addr, timer);
1049  return m_conn_manager->get_comm()->wait_for_proxy_load(timer);
1050 }
1051 
1053  m_client->hyperspace_disconnected();
1054 }
1055 
1057  m_client->hyperspace_reconnected();
1058 }
Declarations for FetchResult request parameters.
Hyperspace::HandleCallbackPtr m_master_file_callback
Definition: Client.h:186
Declarations for SystemStatus response parameters.
void initialize(const String &name)
Public initialization function - creates a singleton instance of LogWriter.
Definition: Logger.cc:45
void drop_namespace(const std::string &name, Namespace *base=NULL, bool if_exists=false)
Removes a namespace.
Definition: Client.cc:150
#define HT_WARNF(msg,...)
Definition: Logger.h:290
DispatchHandlerPtr m_dispatcher_handler
Definition: Client.h:189
Request parameters for fetch result request.
Definition: FetchResult.h:46
Range specification.
Definition: RangeSpec.h:40
Request parameters for set state operation.
Definition: SetState.h:49
Holds Nagios-style program status information.
Definition: Status.h:42
Balance plan.
Definition: BalancePlan.h:38
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
const Hypertable::Status & status() const
Gets status object.
Definition: Status.h:70
Request parameters for alter table operation.
Definition: AlterTable.h:46
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Declarations for DropNamespace request parameters.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
Request parameters for create table operation.
Definition: CreateTable.h:46
static std::string to_str(int flags)
Converts flags to human readable string.
Abstract base class for application dispatch handlers registered with AsyncComm.
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Definition: Init.h:95
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Declarations for AlterTable request parameters.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
Request parameters for compact operation.
Definition: Compact.h:46
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
Definition: Random.cc:55
bool expired()
Returns true if the timer is expired.
Definition: Timer.h:112
ApplicationQueueInterfacePtr m_app_queue
Definition: Client.h:143
Client(const std::string &install_dir, const std::string &config_file, uint32_t default_timeout_ms=0)
Constructs the object using the specified config file.
Definition: Client.cc:60
Request parameters for stop function.
Definition: Stop.h:46
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Represents a set of table parts (sub-tables).
Definition: TableParts.h:47
Request parameters for create namespace operation.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Request parameters for move range operation.
Definition: MoveRange.h:49
Response parameters for status operation.
Definition: Status.h:50
Declarations for Stop request parameters.
void create_table(String &ns, String &tablename, String &rs_metrics_file)
Request parameters for drop table operation.
Definition: DropTable.h:46
Request parameters for replay status operation.
Definition: ReplayStatus.h:46
Declarations for ReplayStatus request parameters.
Hyperspace::SessionPtr m_hyperspace
Definition: Client.h:144
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
Request parameters for replay complete operation.
Request parameters for recreate index tables operation.
Declarations for CreateTable request parameters.
const char * end_row
Definition: RangeSpec.h:60
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
Encapsulate an internet address.
Definition: InetAddr.h:66
Request parameters for phantom commit complete operation.
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
A timer class to keep timeout states across AsyncComm related calls.
void initialize()
Definition: Client.cc:198
Declarations for PhantomPrepareComplete request parameters.
Compatibility Macros for C/C++.
Declarations for NamespaceFlag.
Declarations for PhantomCommitComplete request parameters.
ConnectionInitializerPtr m_connection_initializer
Definition: Client.h:190
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
Functions to serialize/deserialize primitives to/from a memory buffer.
Request parameters for rename table operation.
Definition: RenameTable.h:46
Response parameters for status operation.
Definition: SystemStatus.h:50
Declarations for CreateNamespace request parameters.
Time related declarations.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
void initialize_hyperspace()
Assumes access is serialized via m_hyperspace_mutex.
Definition: Client.cc:159
Declarations for Balance request parameters.
Hypertable library.
Definition: CellInterval.h:30
Hypertable definitions
Declarations for RecreateIndexTables request parameters.
Declarations for MoveRange request parameters.
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
DispatchHandler class used to synchronize with response messages.
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
Request parameters for drop namespace operation.
Definition: DropNamespace.h:46
Entry point to AsyncComm service.
Definition: Comm.h:61
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
const char * start_row
Definition: RangeSpec.h:59
void start()
Starts the timer.
Definition: Timer.h:64
Request parameters for phantom prepare complete operation.
Declarations for RenameTable request parameters.
Declarations for Protocol.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
void create_namespace(const std::string &name, Namespace *base=NULL, bool create_intermediate=false, bool if_not_exists=false)
Creates a namespace.
Definition: Client.cc:86
Random number generator for int32, int64, double and ascii arrays.
uint8_t * base
Pointer to the allocated memory buffer.
Internet address wrapper classes and utility functions.
Request parameters for relinquish acknowledge operation.
bool split(int flags)
Tests the SPLIT bit of flags
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
This is a generic exception class for Hypertable.
Definition: Error.h:314
std::shared_ptr< DispatchHandler > DispatchHandlerPtr
Smart pointer to DispatchHandler.
Declarations for ReplayComplete request parameters.
Declarations for MasterClient This file contains declarations for MasterClient, a client interface cl...
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
const std::string to_string() const
Returns human readable string describing table parts.
Definition: TableParts.cc:63
Create file if it does not exist.
Definition: Session.h:77
Declarations for ApplicationQueueInterface.
Configuration settings.
ClientHyperspaceSessionCallback m_hyperspace_session_callback
Definition: Client.h:195
Open file for reading.
Definition: Session.h:71
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
#define HT_WARN(msg)
Definition: Logger.h:289
Request parameters for balance operation.
Definition: Balance.h:48
std::shared_ptr< ConnectionInitializer > ConnectionInitializerPtr
Smart pointer to ConnectionInitializer.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
const Hypertable::Status & status() const
Gets status object.
Definition: SystemStatus.h:70
Declarations for RelinquishAcknowledge request parameters.
#define HT_DEBUG_OUT
Definition: Logger.h:261
ConnectionManagerPtr m_conn_manager
Definition: Client.h:142
Declarations for DispatchHandlerSynchronizer.
int code() const
Returns the error code.
Definition: Error.h:391
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484