0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ThriftBroker.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
18  */
19 
25 
26 #include <Common/Compat.h>
27 
28 #include <ThriftBroker/Config.h>
33 
34 #include <HyperAppHelper/Unique.h>
35 #include <HyperAppHelper/Error.h>
36 
37 #include <Hypertable/Lib/Client.h>
38 #include <Hypertable/Lib/Future.h>
40 #include <Hypertable/Lib/Key.h>
43 
44 #include <Common/Cronolog.h>
45 #include <Common/fast_clock.h>
46 #include <Common/Init.h>
47 #include <Common/Logger.h>
48 #include <Common/Random.h>
49 #include <Common/System.h>
50 #include <Common/Time.h>
51 
52 #include <concurrency/ThreadManager.h>
53 #include <protocol/TBinaryProtocol.h>
54 #include <server/TThreadedServer.h>
55 #include <transport/TBufferTransports.h>
56 #include <transport/TServerSocket.h>
57 #include <transport/TSocket.h>
58 #include <transport/TTransportUtils.h>
59 
60 #include <boost/shared_ptr.hpp>
61 
62 #include <iostream>
63 #include <iomanip>
64 #include <map>
65 #include <memory>
66 #include <mutex>
67 #include <sstream>
68 #include <unordered_map>
69 
70 extern "C" {
71 #include <signal.h>
72 #include <sys/types.h>
73 #include <unistd.h>
74 }
75 
80 
81 namespace {
83  MetricsHandlerPtr g_metrics_handler;
85  bool g_log_slow_queries {};
87  int32_t g_slow_query_latency_threshold {};
89  Cronolog *g_slow_query_log {};
90 }
91 
92 #define THROW_TE(_code_, _str_) do { ThriftGen::ClientException te; \
93  te.code = _code_; \
94  te.message.append(Error::get_text(_code_)); \
95  te.message.append(" - "); \
96  te.message.append(_str_); \
97  te.__isset.code = te.__isset.message = true; \
98  throw te; \
99 } while (0)
100 
101 #define RETHROW(_expr_) catch (Hypertable::Exception &e) { \
102  std::ostringstream oss; oss << HT_FUNC << " " << _expr_ << ": "<< e; \
103  HT_ERROR_OUT << oss.str() << HT_END; \
104  oss.str(""); \
105  oss << _expr_; \
106  g_metrics_handler->error_increment(); \
107  THROW_TE(e.code(), oss.str()); \
108 }
109 
110 #define LOG_API_START(_expr_) \
111  std::chrono::fast_clock::time_point start_time, end_time; \
112  std::ostringstream logging_stream;\
113  ScannerInfoPtr scanner_info;\
114  g_metrics_handler->request_increment(); \
115  if (m_context.log_api || g_log_slow_queries) {\
116  start_time = std::chrono::fast_clock::now(); \
117  if (m_context.log_api)\
118  logging_stream << "API " << __func__ << ": " << _expr_;\
119  }
120 
121 #define LOG_API_FINISH \
122  if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \
123  end_time = std::chrono::fast_clock::now(); \
124  if (m_context.log_api) \
125 std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \
126  if (scanner_info) \
127  scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\
128  }
129 
130 #define LOG_API_FINISH_E(_expr_) \
131  if (m_context.log_api || (g_log_slow_queries && scanner_info)) { \
132  end_time = std::chrono::fast_clock::now(); \
133  if (m_context.log_api) \
134  std::cout << std::chrono::duration_cast<std::chrono::seconds>(start_time.time_since_epoch()).count() <<'.'<< std::setw(9) << std::setfill('0') << (std::chrono::duration_cast<std::chrono::nanoseconds>(start_time.time_since_epoch()).count() % 1000000000LL) <<" API "<< __func__ <<": "<< logging_stream.str() << _expr_ << " latency=" << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << std::endl; \
135  if (scanner_info)\
136  scanner_info->latency += std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count();\
137  }
138 
139 
140 #define LOG_API(_expr_) do { \
141  if (m_context.log_api) \
142  std::cout << hires_ts <<" API "<< __func__ <<": "<< _expr_ << std::endl; \
143 } while (0)
144 
145 #define LOG_HQL_RESULT(_res_) do { \
146  if (m_context.log_api) \
147  cout << hires_ts <<" API "<< __func__ <<": result: "; \
148  if (Logger::logger->isDebugEnabled()) \
149  cout << _res_; \
150  else if (m_context.log_api) { \
151  if (_res_.__isset.results) \
152  cout <<"results.size=" << _res_.results.size(); \
153  if (_res_.__isset.cells) \
154  cout <<"cells.size=" << _res_.cells.size(); \
155  if (_res_.__isset.scanner) \
156  cout <<"scanner="<< _res_.scanner; \
157  if (_res_.__isset.mutator) \
158  cout <<"mutator="<< _res_.mutator; \
159  } \
160  cout << std::endl; \
161 } while(0)
162 
163 #define LOG_SLOW_QUERY(_pd_, _ns_, _hql_) do { \
164  if (g_log_slow_queries) { \
165  end_time = std::chrono::fast_clock::now(); \
166  int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \
167  if (latency_ms >= g_slow_query_latency_threshold) \
168  log_slow_query(__func__, start_time, end_time, latency_ms, _pd_, _ns_, _hql_); \
169  } \
170 } while(0)
171 
172 #define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_) do { \
173  if (g_log_slow_queries) { \
174  ProfileDataScanner pd; \
175  _scanner_->get_profile_data(pd); \
176  end_time = std::chrono::fast_clock::now(); \
177  int64_t latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count(); \
178  if (latency_ms >= g_slow_query_latency_threshold) \
179  log_slow_query_scanspec(__func__, start_time, end_time, latency_ms, pd, _ns_, _table_, _ss_); \
180  } \
181 } while(0)
182 
183 namespace Hypertable { namespace ThriftBroker {
184 
185 using namespace apache::thrift;
186 using namespace apache::thrift::protocol;
187 using namespace apache::thrift::transport;
188 using namespace apache::thrift::server;
189 using namespace apache::thrift::concurrency;
190 
191 using namespace Config;
192 using namespace ThriftGen;
193 using namespace boost;
194 using namespace std;
195 
196 
198 public:
200  const ThriftGen::MutateSpec &mutate_spec)
201  : m_namespace(ns), m_tablename(tablename), m_mutate_spec(mutate_spec) {}
202 
203  int compare(const SharedMutatorMapKey &skey) const {
204  int64_t cmp;
205 
206  cmp = (int64_t)m_namespace - (int64_t)skey.m_namespace;
207  if (cmp != 0)
208  return cmp;
209  cmp = m_tablename.compare(skey.m_tablename);
210  if (cmp != 0)
211  return cmp;
212  cmp = m_mutate_spec.appname.compare(skey.m_mutate_spec.appname);
213  if (cmp != 0)
214  return cmp;
215  cmp = m_mutate_spec.flush_interval - skey.m_mutate_spec.flush_interval;
216  if (cmp != 0)
217  return cmp;
218  cmp = m_mutate_spec.flags - skey.m_mutate_spec.flags;
219  return cmp;
220  }
221 
224  ThriftGen::MutateSpec m_mutate_spec;
225 };
226 
227 inline bool operator < (const SharedMutatorMapKey &skey1,
228  const SharedMutatorMapKey &skey2) {
229  return skey1.compare(skey2) < 0;
230 }
231 
232 typedef Meta::list<ThriftBrokerPolicy, DefaultCommPolicy> Policies;
233 
234 typedef std::map<SharedMutatorMapKey, TableMutator * > SharedMutatorMap;
235 typedef std::unordered_map< ::int64_t, ClientObjectPtr> ObjectMap;
236 typedef std::vector<ThriftGen::Cell> ThriftCells;
237 typedef std::vector<CellAsArray> ThriftCellsAsArrays;
238 
239 class Context {
240 public:
242  client = new Hypertable::Client();
243  log_api = Config::get_bool("ThriftBroker.API.Logging");
244  next_threshold = Config::get_i32("ThriftBroker.NextThreshold");
245  future_capacity = Config::get_i32("ThriftBroker.Future.Capacity");
246  }
249  SharedMutatorMap shared_mutator_map;
250  bool log_api;
251  ::uint32_t next_threshold;
252  ::uint32_t future_capacity;
253 };
254 
255 int64_t
256 cell_str_to_num(const std::string &from, const char *label,
257  int64_t min_num = INT64_MIN, int64_t max_num = INT64_MAX) {
258  char *endp;
259 
260  int64_t value = strtoll(from.data(), &endp, 0);
261 
262  if (endp - from.data() != (int)from.size()
263  || value < min_num || value > max_num)
264  HT_THROWF(Error::BAD_KEY, "Error converting %s to %s", from.c_str(), label);
265 
266  return value;
267 }
268 
269 void
270 convert_scan_spec(const ThriftGen::ScanSpec &tss, Hypertable::ScanSpec &hss) {
271  if (tss.__isset.row_limit)
272  hss.row_limit = tss.row_limit;
273 
274  if (tss.__isset.cell_limit)
275  hss.cell_limit = tss.cell_limit;
276 
277  if (tss.__isset.cell_limit_per_family)
278  hss.cell_limit_per_family = tss.cell_limit_per_family;
279 
280  if (tss.__isset.versions)
281  hss.max_versions = tss.versions;
282 
283  if (tss.__isset.start_time)
284  hss.time_interval.first = tss.start_time;
285 
286  if (tss.__isset.end_time)
287  hss.time_interval.second = tss.end_time;
288 
289  if (tss.__isset.return_deletes)
290  hss.return_deletes = tss.return_deletes;
291 
292  if (tss.__isset.keys_only)
293  hss.keys_only = tss.keys_only;
294 
295  if (tss.__isset.row_regexp)
296  hss.row_regexp = tss.row_regexp.c_str();
297 
298  if (tss.__isset.value_regexp)
299  hss.value_regexp = tss.value_regexp.c_str();
300 
301  if (tss.__isset.scan_and_filter_rows)
302  hss.scan_and_filter_rows = tss.scan_and_filter_rows;
303 
304  if (tss.__isset.do_not_cache)
305  hss.do_not_cache = tss.do_not_cache;
306 
307  if (tss.__isset.row_offset)
308  hss.row_offset = tss.row_offset;
309 
310  if (tss.__isset.cell_offset)
311  hss.cell_offset = tss.cell_offset;
312 
313  if (tss.__isset.and_column_predicates)
314  hss.and_column_predicates = tss.and_column_predicates;
315 
316  // shallow copy
317  const char *start_row;
318  const char *end_row;
319  for (const auto &ri : tss.row_intervals) {
320  start_row = ri.__isset.start_row ?
321  ri.start_row.c_str() :
322  ri.__isset.start_row_binary ?
323  ri.start_row_binary.c_str() :
324  "";
325  end_row = ri.__isset.end_row ?
326  ri.end_row.c_str() :
327  ri.__isset.end_row_binary ?
328  ri.end_row_binary.c_str() :
331  start_row, ri.__isset.start_inclusive && ri.start_inclusive,
332  end_row, ri.__isset.end_inclusive && ri.end_inclusive));
333  }
334 
335  for (const auto &ci : tss.cell_intervals)
337  ci.__isset.start_row ? ci.start_row.c_str() : "",
338  ci.start_column.c_str(),
339  ci.__isset.start_inclusive && ci.start_inclusive,
340  ci.__isset.end_row ? ci.end_row.c_str() : Hypertable::Key::END_ROW_MARKER,
341  ci.end_column.c_str(),
342  ci.__isset.end_inclusive && ci.end_inclusive));
343 
344  for (const auto &col : tss.columns)
345  hss.columns.push_back(col.c_str());
346 
347  for (const auto &cp : tss.column_predicates) {
348  HT_INFOF("%s:%s %s", cp.column_family.c_str(), cp.column_qualifier.c_str(),
349  cp.__isset.value ? cp.value.c_str() : "");
350  hss.column_predicates.push_back(
352  cp.__isset.column_family ? cp.column_family.c_str() : 0,
353  cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
354  cp.operation,
355  cp.__isset.value ? cp.value.c_str() : 0,
356  cp.__isset.value ? cp.value.size() : 0));
357  }
358 }
359 
360 
361 void
362 convert_scan_spec(const ThriftGen::ScanSpec &tss, Hypertable::ScanSpecBuilder &ssb) {
363  if (tss.__isset.row_limit)
364  ssb.set_row_limit(tss.row_limit);
365 
366  if (tss.__isset.cell_limit)
367  ssb.set_cell_limit(tss.cell_limit);
368 
369  if (tss.__isset.cell_limit_per_family)
370  ssb.set_cell_limit_per_family(tss.cell_limit_per_family);
371 
372  if (tss.__isset.versions)
373  ssb.set_max_versions(tss.versions);
374 
375  if (tss.__isset.start_time)
376  ssb.set_start_time(tss.start_time);
377 
378  if (tss.__isset.end_time)
379  ssb.set_end_time(tss.end_time);
380 
381  if (tss.__isset.return_deletes)
382  ssb.set_return_deletes(tss.return_deletes);
383 
384  if (tss.__isset.keys_only)
385  ssb.set_keys_only(tss.keys_only);
386 
387  if (tss.__isset.row_regexp)
388  ssb.set_row_regexp(tss.row_regexp.c_str());
389 
390  if (tss.__isset.value_regexp)
391  ssb.set_value_regexp(tss.value_regexp.c_str());
392 
393  if (tss.__isset.scan_and_filter_rows)
394  ssb.set_scan_and_filter_rows(tss.scan_and_filter_rows);
395 
396  if (tss.__isset.do_not_cache)
397  ssb.set_do_not_cache(tss.do_not_cache);
398 
399  if (tss.__isset.row_offset)
400  ssb.set_row_offset(tss.row_offset);
401 
402  if (tss.__isset.cell_offset)
403  ssb.set_cell_offset(tss.cell_offset);
404 
405  if (tss.__isset.and_column_predicates)
406  ssb.set_and_column_predicates(tss.and_column_predicates);
407 
408  // columns
409  ssb.reserve_columns(tss.columns.size());
410  for (auto & col : tss.columns)
411  ssb.add_column(col);
412 
413  // row intervals
414  const char *start_row;
415  const char *end_row;
416  ssb.reserve_rows(tss.row_intervals.size());
417  for (auto & ri : tss.row_intervals) {
418  start_row = ri.__isset.start_row ?
419  ri.start_row.c_str() :
420  ri.__isset.start_row_binary ?
421  ri.start_row_binary.c_str() : "";
422  end_row = ri.__isset.end_row ?
423  ri.end_row.c_str() :
424  ri.__isset.end_row_binary ?
425  ri.end_row_binary.c_str() : Hypertable::Key::END_ROW_MARKER;
426  ssb.add_row_interval(
427  start_row, ri.__isset.start_inclusive && ri.start_inclusive,
428  end_row, ri.__isset.end_inclusive && ri.end_inclusive);
429  }
430 
431  // cell intervals
432  ssb.reserve_cells(tss.cell_intervals.size());
433  for (auto & ci : tss.cell_intervals)
434  ssb.add_cell_interval(
435  ci.__isset.start_row ? ci.start_row.c_str() : "",
436  ci.start_column.c_str(),
437  ci.__isset.start_inclusive && ci.start_inclusive,
438  ci.__isset.end_row ? ci.end_row.c_str() : Hypertable::Key::END_ROW_MARKER,
439  ci.end_column.c_str(),
440  ci.__isset.end_inclusive && ci.end_inclusive);
441 
442  // column predicates
443  ssb.reserve_column_predicates(tss.column_predicates.size());
444  for (auto & cp : tss.column_predicates)
446  cp.__isset.column_family ? cp.column_family.c_str() : 0,
447  cp.__isset.column_qualifier ? cp.column_qualifier.c_str() : 0,
448  cp.operation,
449  cp.__isset.value ? cp.value.c_str() : 0,
450  cp.__isset.value ? cp.value.size() : 0);
451 }
452 
453 void
454 convert_cell(const ThriftGen::Cell &tcell, Hypertable::Cell &hcell) {
455  // shallow copy
456  if (tcell.key.__isset.row)
457  hcell.row_key = tcell.key.row.c_str();
458 
459  if (tcell.key.__isset.column_family)
460  hcell.column_family = tcell.key.column_family.c_str();
461 
462  if (tcell.key.__isset.column_qualifier)
463  hcell.column_qualifier = tcell.key.column_qualifier.c_str();
464 
465  if (tcell.key.__isset.timestamp)
466  hcell.timestamp = tcell.key.timestamp;
467 
468  if (tcell.key.__isset.revision)
469  hcell.revision = tcell.key.revision;
470 
471  if (tcell.__isset.value) {
472  hcell.value = (::uint8_t *)tcell.value.c_str();
473  hcell.value_len = tcell.value.length();
474  }
475 
476  if (tcell.key.__isset.flag)
477  hcell.flag = tcell.key.flag;
478 }
479 
480 void
481 convert_key(const ThriftGen::Key &tkey, Hypertable::KeySpec &hkey) {
482  // shallow copy
483  if (tkey.__isset.row) {
484  hkey.row = tkey.row.c_str();
485  hkey.row_len = tkey.row.size();
486  }
487 
488  if (tkey.__isset.column_family)
489  hkey.column_family = tkey.column_family.c_str();
490 
491  if (tkey.__isset.column_qualifier)
492  hkey.column_qualifier = tkey.column_qualifier.c_str();
493 
494  if (tkey.__isset.timestamp)
495  hkey.timestamp = tkey.timestamp;
496 
497  if (tkey.__isset.revision)
498  hkey.revision = tkey.revision;
499 }
500 
501 int32_t
502 convert_cell(const Hypertable::Cell &hcell, ThriftGen::Cell &tcell) {
503  int32_t amount = sizeof(ThriftGen::Cell);
504 
505  tcell.key.row = hcell.row_key;
506  amount += tcell.key.row.length();
507  tcell.key.column_family = hcell.column_family;
508  amount += tcell.key.column_family.length();
509 
510  if (hcell.column_qualifier && *hcell.column_qualifier) {
511  tcell.key.column_qualifier = hcell.column_qualifier;
512  tcell.key.__isset.column_qualifier = true;
513  amount += tcell.key.column_qualifier.length();
514  }
515  else {
516  tcell.key.column_qualifier = "";
517  tcell.key.__isset.column_qualifier = false;
518  }
519 
520  tcell.key.timestamp = hcell.timestamp;
521  tcell.key.revision = hcell.revision;
522 
523  if (hcell.value && hcell.value_len) {
524  tcell.value = std::string((char *)hcell.value, hcell.value_len);
525  tcell.__isset.value = true;
526  amount += hcell.value_len;
527  }
528  else {
529  tcell.value = "";
530  tcell.__isset.value = false;
531  }
532 
533  tcell.key.flag = (KeyFlag::type)hcell.flag;
534  tcell.key.__isset.row = tcell.key.__isset.column_family
535  = tcell.key.__isset.timestamp = tcell.key.__isset.revision
536  = tcell.key.__isset.flag = true;
537  return amount;
538 }
539 
540 void convert_cell(const CellAsArray &tcell, Hypertable::Cell &hcell) {
541  int len = tcell.size();
542 
543  switch (len) {
544  case 7: hcell.flag = cell_str_to_num(tcell[6], "cell flag", 0, 255);
545  case 6: hcell.revision = cell_str_to_num(tcell[5], "revision");
546  case 5: hcell.timestamp = cell_str_to_num(tcell[4], "timestamp");
547  case 4: hcell.value = (::uint8_t *)tcell[3].c_str();
548  hcell.value_len = tcell[3].length();
549  case 3: hcell.column_qualifier = tcell[2].c_str();
550  case 2: hcell.column_family = tcell[1].c_str();
551  case 1: hcell.row_key = tcell[0].c_str();
552  break;
553  default:
554  HT_THROWF(Error::BAD_KEY, "CellAsArray: bad size: %d", len);
555  }
556 }
557 
558 int32_t convert_cell(const Hypertable::Cell &hcell, CellAsArray &tcell) {
559  int32_t amount = 5*sizeof(std::string);
560  tcell.resize(5);
561  tcell[0] = hcell.row_key;
562  amount += tcell[0].length();
563  tcell[1] = hcell.column_family;
564  amount += tcell[1].length();
565  tcell[2] = hcell.column_qualifier ? hcell.column_qualifier : "";
566  amount += tcell[2].length();
567  tcell[3] = std::string((char *)hcell.value, hcell.value_len);
568  amount += tcell[3].length();
569  tcell[4] = format("%llu", (Llu)hcell.timestamp);
570  amount += tcell[4].length();
571  return amount;
572 }
573 
574 int32_t convert_cells(const Hypertable::Cells &hcells, ThriftCells &tcells) {
575  // deep copy
576  int32_t amount = sizeof(ThriftCells);
577  tcells.resize(hcells.size());
578  for(size_t ii=0; ii<hcells.size(); ++ii)
579  amount += convert_cell(hcells[ii], tcells[ii]);
580 
581  return amount;
582 }
583 
584 void convert_cells(const ThriftCells &tcells, Hypertable::Cells &hcells) {
585  // shallow copy
586  for (const auto &tcell : tcells) {
587  Hypertable::Cell hcell;
588  convert_cell(tcell, hcell);
589  hcells.push_back(hcell);
590  }
591 }
592 
593 int32_t convert_cells(const Hypertable::Cells &hcells, ThriftCellsAsArrays &tcells) {
594  // deep copy
595  int32_t amount = sizeof(CellAsArray);
596  tcells.resize(hcells.size());
597  for(size_t ii=0; ii<hcells.size(); ++ii) {
598  amount += convert_cell(hcells[ii], tcells[ii]);
599  }
600  return amount;
601 }
602 
603 int32_t convert_cells(Hypertable::Cells &hcells, CellsSerialized &tcells) {
604  // deep copy
605  int32_t amount = 0 ;
606  for(size_t ii=0; ii<hcells.size(); ++ii) {
607  amount += strlen(hcells[ii].row_key) + strlen(hcells[ii].column_family)
608  + strlen(hcells[ii].column_qualifier) + 8 + 8 + 4 + 1
609  + hcells[ii].value_len + 4;
610  }
611  SerializedCellsWriter writer(amount, true);
612  for (size_t ii = 0; ii < hcells.size(); ++ii) {
613  writer.add(hcells[ii]);
614  }
616  tcells = String((char *)writer.get_buffer(), writer.get_buffer_length());
617  amount = tcells.size();
618  return amount;
619 }
620 
621 void
622 convert_cells(const ThriftCellsAsArrays &tcells, Hypertable::Cells &hcells) {
623  // shallow copy
624  for (const auto &tcell : tcells) {
625  Hypertable::Cell hcell;
626  convert_cell(tcell, hcell);
627  hcells.push_back(hcell);
628  }
629 }
630 
632  ThriftGen::TableSplit &tsplit) {
634  if (hsplit.start_row) {
635  tsplit.start_row = hsplit.start_row;
636  tsplit.__isset.start_row = true;
637  }
638  else {
639  tsplit.start_row = "";
640  tsplit.__isset.start_row = false;
641  }
642 
644  if (hsplit.end_row &&
645  !(hsplit.end_row[0] == (char)0xff && hsplit.end_row[1] == (char)0xff)) {
646  tsplit.end_row = hsplit.end_row;
647  tsplit.__isset.end_row = true;
648  }
649  else {
650  tsplit.end_row = Hypertable::Key::END_ROW_MARKER;
651  tsplit.__isset.end_row = false;
652  }
653 
655  if (hsplit.location) {
656  tsplit.location = hsplit.location;
657  tsplit.__isset.location = true;
658  }
659  else {
660  tsplit.location = "";
661  tsplit.__isset.location = false;
662  }
663 
665  if (hsplit.ip_address) {
666  tsplit.ip_address = hsplit.ip_address;
667  tsplit.__isset.ip_address = true;
668  }
669  else {
670  tsplit.ip_address = "";
671  tsplit.__isset.ip_address = false;
672  }
673 
675  if (hsplit.hostname) {
676  tsplit.hostname = hsplit.hostname;
677  tsplit.__isset.hostname = true;
678  }
679  else {
680  tsplit.hostname = "";
681  tsplit.__isset.hostname = false;
682  }
683 
684 }
685 
687  ThriftGen::ColumnFamilyOptions &toptions) {
688  bool ret = false;
689  if (hoptions.is_set_max_versions()) {
690  toptions.__set_max_versions(hoptions.get_max_versions());
691  ret = true;
692  }
693  if (hoptions.is_set_ttl()) {
694  toptions.__set_ttl(hoptions.get_ttl());
695  ret = true;
696  }
697  if (hoptions.is_set_time_order_desc()) {
698  toptions.__set_time_order_desc(hoptions.get_time_order_desc());
699  ret = true;
700  }
701  if (hoptions.is_set_counter()) {
702  toptions.__set_counter(hoptions.get_counter());
703  ret = true;
704  }
705  return ret;
706 }
707 
708 void convert_column_family_options(const ThriftGen::ColumnFamilyOptions &toptions,
710  if (toptions.__isset.max_versions)
711  hoptions.set_max_versions(toptions.max_versions);
712  if (toptions.__isset.ttl)
713  hoptions.set_ttl(toptions.ttl);
714  if (toptions.__isset.time_order_desc)
715  hoptions.set_time_order_desc(toptions.time_order_desc);
716  if (toptions.__isset.counter)
717  hoptions.set_counter(toptions.counter);
718 }
719 
721  ThriftGen::AccessGroupOptions &toptions) {
722  bool ret = false;
723  if (hoptions.is_set_in_memory()) {
724  toptions.__set_in_memory(hoptions.get_in_memory());
725  ret = true;
726  }
727  if (hoptions.is_set_replication()) {
728  toptions.__set_replication(hoptions.get_replication());
729  ret = true;
730  }
731  if (hoptions.is_set_blocksize()) {
732  toptions.__set_blocksize(hoptions.get_blocksize());
733  ret = true;
734  }
735  if (hoptions.is_set_compressor()) {
736  toptions.__set_compressor(hoptions.get_compressor());
737  ret = true;
738  }
739  if (hoptions.is_set_bloom_filter()) {
740  toptions.__set_bloom_filter(hoptions.get_bloom_filter());
741  ret = true;
742  }
743  return ret;
744 }
745 
746 void convert_access_group_options(const ThriftGen::AccessGroupOptions &toptions,
747  Hypertable::AccessGroupOptions &hoptions) {
748  if (toptions.__isset.in_memory)
749  hoptions.set_in_memory(toptions.in_memory);
750  if (toptions.__isset.replication)
751  hoptions.set_replication(toptions.replication);
752  if (toptions.__isset.blocksize)
753  hoptions.set_blocksize(toptions.blocksize);
754  if (toptions.__isset.compressor)
755  hoptions.set_compressor(toptions.compressor);
756  if (toptions.__isset.bloom_filter)
757  hoptions.set_bloom_filter(toptions.bloom_filter);
758 }
759 
760 
762  ThriftGen::Schema &tschema) {
763 
764  if (hschema->get_generation())
765  tschema.__set_generation(hschema->get_generation());
766 
767  tschema.__set_version(hschema->get_version());
768 
769  if (hschema->get_group_commit_interval())
770  tschema.__set_group_commit_interval(hschema->get_group_commit_interval());
771 
772  for (auto ag_spec : hschema->get_access_groups()) {
773  ThriftGen::AccessGroupSpec tag;
774  tag.name = ag_spec->get_name();
775  tag.__set_generation(ag_spec->get_generation());
776  if (convert_access_group_options(ag_spec->options(), tag.options))
777  tag.__isset.options = true;
778  if (convert_column_family_options(ag_spec->defaults(), tag.defaults))
779  tag.__isset.defaults = true;
780  tschema.access_groups[ag_spec->get_name()] = tag;
781  tschema.__isset.access_groups = true;
782  }
783 
784  for (auto cf_spec : hschema->get_column_families()) {
785  ThriftGen::ColumnFamilySpec tcf;
786  tcf.name = cf_spec->get_name();
787  tcf.access_group = cf_spec->get_access_group();
788  tcf.deleted = cf_spec->get_deleted();
789  if (cf_spec->get_generation())
790  tcf.__set_generation(cf_spec->get_generation());
791  if (cf_spec->get_id())
792  tcf.__set_id(cf_spec->get_id());
793  tcf.value_index = cf_spec->get_value_index();
794  tcf.qualifier_index = cf_spec->get_qualifier_index();
795  if (convert_column_family_options(cf_spec->options(), tcf.options))
796  tcf.__isset.options = true;
797  tschema.column_families[cf_spec->get_name()] = tcf;
798  tschema.__isset.column_families = true;
799  }
800 
801  if (convert_access_group_options(hschema->access_group_defaults(),
802  tschema.access_group_defaults))
803  tschema.__isset.access_group_defaults = true;
804 
805  if (convert_column_family_options(hschema->column_family_defaults(),
806  tschema.column_family_defaults))
807  tschema.__isset.column_family_defaults = true;
808 
809 }
810 
811 void convert_schema(const ThriftGen::Schema &tschema,
812  Hypertable::SchemaPtr &hschema) {
813 
814  if (tschema.__isset.generation)
815  hschema->set_generation(tschema.generation);
816 
817  hschema->set_version(tschema.version);
818 
819  hschema->set_group_commit_interval(tschema.group_commit_interval);
820 
821  convert_access_group_options(tschema.access_group_defaults,
822  hschema->access_group_defaults());
823 
824  convert_column_family_options(tschema.column_family_defaults,
825  hschema->column_family_defaults());
826 
827  bool need_default = true;
828  unordered_map<string, Hypertable::AccessGroupSpec *> ag_map;
829  for (auto & entry : tschema.access_groups) {
830  if (entry.second.name == "default")
831  need_default = false;
832  Hypertable::AccessGroupSpec *ag = new Hypertable::AccessGroupSpec(entry.second.name);
833  if (entry.second.__isset.generation)
834  ag->set_generation(entry.second.generation);
835  ag_map[entry.second.name] = ag;
837  convert_access_group_options(entry.second.options, ag_options);
838  ag->merge_options(ag_options);
840  convert_column_family_options(entry.second.defaults, cf_defaults);
841  ag->merge_defaults(cf_defaults);
842  }
843  if (need_default)
844  ag_map["default"] = new Hypertable::AccessGroupSpec("default");
845 
846  for (auto & entry : tschema.column_families) {
848  cf->set_name(entry.second.name);
849  if (entry.second.access_group.empty())
850  cf->set_access_group("default");
851  else
852  cf->set_access_group(entry.second.access_group);
853  cf->set_deleted(entry.second.deleted);
854  if (entry.second.__isset.generation)
855  cf->set_generation(entry.second.generation);
856  if (entry.second.__isset.id)
857  cf->set_id(entry.second.id);
858  cf->set_value_index(entry.second.value_index);
859  cf->set_qualifier_index(entry.second.qualifier_index);
861  convert_column_family_options(entry.second.options, cf_options);
862  cf->merge_options(cf_options);
863  // Add column family to corresponding schema
864  auto iter = ag_map.find(cf->get_access_group());
865  if (iter == ag_map.end())
867  "Undefined access group '%s' referenced by column '%s'",
868  cf->get_access_group().c_str(), cf->get_name().c_str());
869  iter->second->add_column(cf);
870  }
871 
872  // Add access groups to schema
873  for (auto & entry : ag_map)
874  hschema->add_access_group(entry.second);
875 
876 }
877 
878 class ScannerInfo {
879 public:
880  ScannerInfo(int64_t ns) : ns(ns), scan_spec_builder(128) { }
881  ScannerInfo(int64_t ns, const string &t) :
882  ns(ns), table(t), scan_spec_builder(128) { }
883  int64_t ns;
884  string hql;
885  const string table;
887  int64_t latency {};
888 };
889 typedef std::shared_ptr<ScannerInfo> ScannerInfoPtr;
890 
891 class ServerHandler;
892 
893 template <class ResultT, class CellT>
896 
897  ResultT &result;
900  int64_t ns {};
901  string hql;
902  bool flush, buffered;
903  bool is_scan {};
904 
905  HqlCallback(ResultT &r, ServerHandler *handler, const ThriftGen::Namespace ns,
906  const String &hql, bool flush, bool buffered)
907  : result(r), handler(*handler), ns(ns), hql(hql), flush(flush),
908  buffered(buffered) { }
909 
910  void on_return(const std::string &) override;
911  void on_scan(TableScannerPtr &) override;
912  void on_finish(TableMutatorPtr &) override;
913 
914 };
915 
916 
917 class ServerHandler : public HqlServiceIf {
918  struct Statistics {
920  : scanners(0), async_scanners(0), mutators(0), async_mutators(0),
921  shared_mutators(0), namespaces(0), futures(0) {
922  }
923 
924  bool operator==(const Statistics &other) {
925  return scanners == other.scanners
926  && async_scanners == other.async_scanners
927  && mutators == other.mutators
928  && async_mutators == other.async_mutators
929  && shared_mutators == other.shared_mutators
930  && namespaces == other.namespaces
931  && futures == other.futures;
932  }
933 
934  void operator=(const Statistics &other) {
935  scanners = other.scanners;
936  async_scanners = other.async_scanners;
937  mutators = other.mutators;
938  async_mutators = other.async_mutators;
939  shared_mutators = other.shared_mutators;
940  namespaces = other.namespaces;
941  futures = other.futures;
942  }
943 
944  int scanners;
946  int mutators;
950  int futures;
951  };
952 
953 public:
954 
955  ServerHandler(const String& remote_peer, Context &c)
956  : m_remote_peer(remote_peer), m_context(c) {
957  }
958 
959  virtual ~ServerHandler() {
960  std::lock_guard<std::mutex> lock(m_mutex);
961  if (!m_object_map.empty())
962  HT_WARNF("Destroying ServerHandler for remote peer %s with %d objects in map",
963  m_remote_peer.c_str(),
964  (int)m_object_map.size());
965  // Clear reference map. Force each object to be destroyed individually and
966  // catch and log exceptions.
967  for (auto entry : m_reference_map) {
968  try {
969  entry.second = nullptr;
970  }
971  catch (Exception &e) {
972  HT_ERROR_OUT << e << HT_END;
973  }
974  }
975  try {
976  m_reference_map.clear();
977  }
978  catch (Exception &e) {
979  }
980  // Clear object map. Force each object to be destroyed individually and
981  // catch and log exceptions.
982  for (auto entry : m_object_map) {
983  try {
984  entry.second = nullptr;
985  }
986  catch (Exception &e) {
987  HT_ERROR_OUT << e << HT_END;
988  }
989  }
990  try {
991  m_object_map.clear();
992  }
993  catch (Exception &e) {
994  }
995  // Clear cached object map. Force each object to be destroyed individually
996  // and catch and log exceptions.
997  for (auto entry : m_cached_object_map) {
998  try {
999  entry.second = nullptr;
1000  }
1001  catch (Exception &e) {
1002  HT_ERROR_OUT << e << HT_END;
1003  }
1004  }
1005  try {
1006  m_cached_object_map.clear();
1007  }
1008  catch (Exception &e) {
1009  }
1010  }
1011 
1012  const String& remote_peer() const {
1013  return m_remote_peer;
1014  }
1015 
1016  void log_slow_query(const char *func_name,
1019  int64_t latency_ms, ProfileDataScanner &profile_data,
1020  Hypertable::Namespace *ns, const string &hql) {
1021 
1022  // Build servers string
1023  string servers;
1024  bool first = true;
1025  servers.reserve(profile_data.servers.size()*6);
1026  for (auto & server : profile_data.servers) {
1027  if (first)
1028  first = false;
1029  else
1030  servers.append(",");
1031  servers.append(server);
1032  }
1033 
1034  string ns_str(ns->get_name());
1035  if (ns_str.empty())
1036  ns_str.append("/");
1037  else if (ns_str[0] != '/')
1038  ns_str = string("/") + ns_str;
1039 
1040  // Strip HQL string of newlines
1041  const char *hql_ptr = hql.c_str();
1042  string hql_cleaned;
1043  if (hql.find_first_of('\n') != string::npos) {
1044  hql_cleaned.reserve(hql.length());
1045  const char *base = hql.c_str();
1046  const char *ptr = base;
1047  while (*ptr) {
1048  while (*ptr && *ptr != '\n')
1049  ptr++;
1050  hql_cleaned.append(base, ptr-base);
1051  if (*ptr) {
1052  hql_cleaned.append(" ", 1);
1053  base = ++ptr;
1054  }
1055  }
1056  hql_ptr = hql_cleaned.c_str();
1057  }
1058 
1059  string line = format("%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
1061  func_name, m_remote_peer.c_str(),
1062  (Lld)latency_ms, profile_data.subscanners,
1063  profile_data.scanblocks,
1064  (Lld)profile_data.bytes_returned,
1065  (Lld)profile_data.bytes_scanned,
1066  (Lld)profile_data.disk_read, servers.c_str(),
1067  ns_str.c_str(), hql_ptr);
1068  g_slow_query_log->write(std::chrono::fast_clock::to_time_t(end_time), line);
1069  }
1070 
1071  void log_slow_query_scanspec(const char *func_name,
1074  int64_t latency_ms, ProfileDataScanner &profile_data,
1075  Hypertable::Namespace *ns, const string &table,
1076  Hypertable::ScanSpec &ss) {
1077 
1078  // Build servers string
1079  string servers;
1080  bool first = true;
1081  servers.reserve(profile_data.servers.size()*6);
1082  for (auto & server : profile_data.servers) {
1083  if (first)
1084  first = false;
1085  else
1086  servers.append(",");
1087  servers.append(server);
1088  }
1089 
1090  string ns_str(ns->get_name());
1091  if (ns_str.empty())
1092  ns_str.append("/");
1093  else if (ns_str[0] != '/')
1094  ns_str = string("/") + ns_str;
1095 
1096  string line = format("%lld %s %s %lld %d %d %lld %lld %lld %s %s %s",
1098  func_name, m_remote_peer.c_str(),
1099  (Lld)latency_ms, profile_data.subscanners,
1100  profile_data.scanblocks,
1101  (Lld)profile_data.bytes_returned,
1102  (Lld)profile_data.bytes_scanned,
1103  (Lld)profile_data.disk_read, servers.c_str(),
1104  ns_str.c_str(), ss.render_hql(table).c_str());
1105  g_slow_query_log->write(std::chrono::fast_clock::to_time_t(end_time), line);
1106  }
1107 
1108 
1109  void
1110  hql_exec(HqlResult& result, const ThriftGen::Namespace ns,
1111  const String &hql, bool noflush, bool unbuffered) override {
1112  LOG_API_START("namespace=" << ns << " hql=" << hql << " noflush=" << noflush
1113  << " unbuffered="<< unbuffered);
1114 
1116  cb(result, this, ns, hql, !noflush, !unbuffered);
1117 
1118  try {
1119  run_hql_interp(ns, hql, cb);
1120  //LOG_HQL_RESULT(result);
1121  } RETHROW("namespace=" << ns << " hql="<< hql <<" noflush="<< noflush
1122  << " unbuffered="<< unbuffered)
1123 
1124  if (!unbuffered && cb.is_scan)
1125  LOG_SLOW_QUERY(cb.profile_data, get_namespace(ns), hql);
1126  else {
1127  cb.hql = hql;
1128  cb.ns = ns;
1129  }
1130 
1132  }
1133 
1134  void
1135  hql_query(HqlResult& result, const ThriftGen::Namespace ns,
1136  const String &hql) override {
1137  hql_exec(result, ns, hql, false, false);
1138  }
1139 
1140  void
1141  hql_exec2(HqlResult2 &result, const ThriftGen::Namespace ns,
1142  const String &hql, bool noflush, bool unbuffered) override {
1143  LOG_API_START("namespace=" << ns << " hql="<< hql <<" noflush="<< noflush <<
1144  " unbuffered="<< unbuffered);
1145 
1147  cb(result, this, ns, hql, !noflush, !unbuffered);
1148 
1149  try {
1150  run_hql_interp(ns, hql, cb);
1151  //LOG_HQL_RESULT(result);
1152  } RETHROW("namespace=" << ns << " hql="<< hql <<" noflush="<< noflush <<
1153  " unbuffered="<< unbuffered)
1154 
1155  if (!unbuffered && cb.is_scan)
1156  LOG_SLOW_QUERY(cb.profile_data, get_namespace(ns), hql);
1157 
1159  }
1160 
1161  void
1162  hql_exec_as_arrays(HqlResultAsArrays& result, const ThriftGen::Namespace ns,
1163  const String &hql, bool noflush, bool unbuffered) override {
1164  LOG_API_START("namespace=" << ns << " hql="<< hql <<" noflush="<< noflush <<
1165  " unbuffered="<< unbuffered);
1166 
1168  cb(result, this, ns, hql, !noflush, !unbuffered);
1169 
1170  try {
1171  run_hql_interp(ns, hql, cb);
1172  //LOG_HQL_RESULT(result);
1173  } RETHROW("namespace=" << ns << " hql="<< hql <<" noflush="<< noflush <<
1174  " unbuffered="<< unbuffered)
1175 
1176  if (!unbuffered && cb.is_scan)
1177  LOG_SLOW_QUERY(cb.profile_data, get_namespace(ns), hql);
1178 
1180  }
1181 
1182  void
1183  hql_query2(HqlResult2& result, const ThriftGen::Namespace ns,
1184  const String &hql) override {
1185  hql_exec2(result, ns, hql, false, false);
1186  }
1187 
1188  void
1189  hql_query_as_arrays(HqlResultAsArrays& result, const ThriftGen::Namespace ns,
1190  const String &hql) override {
1191  hql_exec_as_arrays(result, ns, hql, false, false);
1192  }
1193 
1194  void namespace_create(const String &ns) override {
1195  LOG_API_START("namespace=" << ns);
1196  try {
1197  m_context.client->create_namespace(ns, NULL);
1198  } RETHROW("namespace=" << ns)
1200  }
1201 
1202  void create_namespace(const String &ns) override {
1203  namespace_create(ns);
1204  }
1205 
1206  void table_create(const ThriftGen::Namespace ns, const String &table,
1207  const ThriftGen::Schema &schema) override {
1208  LOG_API_START("namespace=" << ns << " table=" << table);
1209 
1210  try {
1211  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1212  Hypertable::SchemaPtr hschema = make_shared<Hypertable::Schema>();
1213  convert_schema(schema, hschema);
1214  namespace_ptr->create_table(table, hschema);
1215  } RETHROW("namespace=" << ns << " table="<< table)
1216 
1218  }
1219 
1220  void table_alter(const ThriftGen::Namespace ns, const String &table,
1221  const ThriftGen::Schema &schema) override {
1222  LOG_API_START("namespace=" << ns << " table=" << table);
1223 
1224  try {
1225  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1226  Hypertable::SchemaPtr hschema = make_shared<Hypertable::Schema>();
1227  convert_schema(schema, hschema);
1228  namespace_ptr->alter_table(table, hschema, false);
1229  } RETHROW("namespace=" << ns << " table="<< table)
1230 
1232  }
1233 
1234  Scanner scanner_open(const ThriftGen::Namespace ns,
1235  const String &table, const ThriftGen::ScanSpec &ss) override {
1236  Scanner id;
1237  LOG_API_START("namespace=" << ns << " table="<< table <<" scan_spec="<< ss);
1238  try {
1239  ScannerInfoPtr si = make_shared<ScannerInfo>(ns, table);
1240  convert_scan_spec(ss, si->scan_spec_builder);
1241  id = get_scanner_id(_open_scanner(ns, table, si->scan_spec_builder.get()), si);
1242  } RETHROW("namespace=" << ns << " table="<< table <<" scan_spec="<< ss)
1243  LOG_API_FINISH_E(" scanner="<<id);
1244  return id;
1245  }
1246 
1247  Scanner open_scanner(const ThriftGen::Namespace ns,
1248  const String &table, const ThriftGen::ScanSpec &ss) override {
1249  return scanner_open(ns, table, ss);
1250  }
1251 
1252  ScannerAsync async_scanner_open(const ThriftGen::Namespace ns,
1253  const String &table, const ThriftGen::Future ff,
1254  const ThriftGen::ScanSpec &ss) override {
1255  ScannerAsync id;
1256  LOG_API_START("namespace=" << ns << " table=" << table << " future="
1257  << ff << " scan_spec=" << ss);
1258  try {
1259  id = get_object_id(_open_scanner_async(ns, table, ff, ss));
1260  add_reference(id, ff);
1261  } RETHROW("namespace=" << ns << " table=" << table << " future="
1262  << ff << " scan_spec="<< ss)
1263 
1264  LOG_API_FINISH_E(" scanner=" << id);
1265  return id;
1266  }
1267 
1268  ScannerAsync open_scanner_async(const ThriftGen::Namespace ns,
1269  const String &table, const ThriftGen::Future ff,
1270  const ThriftGen::ScanSpec &ss) override {
1271  return async_scanner_open(ns, table, ff, ss);
1272  }
1273 
1274  void namespace_close(const ThriftGen::Namespace ns) override {
1275  LOG_API_START("namespace="<< ns);
1276  try {
1277  remove_namespace(ns);
1278  } RETHROW("namespace="<< ns)
1280  }
1281 
1282  void close_namespace(const ThriftGen::Namespace ns) override {
1283  namespace_close(ns);
1284  }
1285 
1286  void refresh_table(const ThriftGen::Namespace ns,
1287  const String &table_name) override {
1288  LOG_API_START("namespace=" << ns << " table=" << table_name);
1289  try {
1290  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1291  namespace_ptr->refresh_table(table_name);
1292  } RETHROW("namespace=" << ns << " table=" << table_name);
1294  }
1295 
1296  void scanner_close(const Scanner id) override {
1297  LOG_API_START("scanner="<< id);
1298  try {
1299  ClientObjectPtr cobj;
1300  remove_scanner(id, cobj, scanner_info);
1301  TableScanner *scanner = dynamic_cast<TableScanner *>(cobj.get());
1302  if (g_log_slow_queries) {
1303  ProfileDataScanner pd;
1304  scanner->get_profile_data(pd);
1305  end_time = std::chrono::fast_clock::now();
1306  if (scanner_info->latency >= g_slow_query_latency_threshold) {
1307  if (scanner_info->hql.empty())
1308  log_slow_query_scanspec(__func__, start_time, end_time,
1309  scanner_info->latency, pd,
1310  get_namespace(scanner_info->ns),
1311  scanner_info->table,
1312  scanner_info->scan_spec_builder.get());
1313  else
1314  log_slow_query(__func__, start_time, end_time,
1315  scanner_info->latency, pd,
1316  get_namespace(scanner_info->ns),
1317  scanner_info->hql);
1318 
1319  }
1320  }
1321  } RETHROW("scanner="<< id)
1323  }
1324 
1325  void close_scanner(const Scanner scanner) override {
1326  scanner_close(scanner);
1327  }
1328 
1329  void async_scanner_cancel(const ScannerAsync scanner) override {
1330  LOG_API_START("scanner="<< scanner);
1331 
1332  try {
1333  get_scanner_async(scanner)->cancel();
1334  } RETHROW("scanner=" << scanner)
1335 
1336  LOG_API_FINISH_E(" cancelled");
1337  }
1338 
1339  void cancel_scanner_async(const ScannerAsync scanner) override {
1340  async_scanner_cancel(scanner);
1341  }
1342 
1343  void async_scanner_close(const ScannerAsync scanner_async) override {
1344  LOG_API_START("scanner_async="<< scanner_async);
1345  try {
1346  remove_scanner(scanner_async);
1347  remove_references(scanner_async);
1348  } RETHROW("scanner_async="<< scanner_async)
1350  }
1351 
1352  void close_scanner_async(const ScannerAsync scanner_async) override {
1353  async_scanner_close(scanner_async);
1354  }
1355 
1356  void scanner_get_cells(ThriftCells &result,
1357  const Scanner scanner_id) override {
1358  LOG_API_START("scanner="<< scanner_id);
1359  try {
1360  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1361  _next(result, scanner, m_context.next_threshold);
1362  } RETHROW("scanner="<< scanner_id)
1363  LOG_API_FINISH_E(" result.size=" << result.size());
1364  }
1365 
1366  void next_cells(ThriftCells &result, const Scanner scanner_id) override {
1367  scanner_get_cells(result, scanner_id);
1368  }
1369 
1370  void scanner_get_cells_as_arrays(ThriftCellsAsArrays &result,
1371  const Scanner scanner_id) override {
1372  LOG_API_START("scanner="<< scanner_id);
1373  try {
1374  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1375  _next(result, scanner, m_context.next_threshold);
1376  } RETHROW("scanner="<< scanner_id <<" result.size="<< result.size())
1377  LOG_API_FINISH_E("result.size="<< result.size());
1378  }
1379 
1380  void next_cells_as_arrays(ThriftCellsAsArrays &result,
1381  const Scanner scanner_id) override {
1382  scanner_get_cells_as_arrays(result, scanner_id);
1383  }
1384 
1385  void scanner_get_cells_serialized(CellsSerialized &result,
1386  const Scanner scanner_id) override {
1387  LOG_API_START("scanner="<< scanner_id);
1388 
1389  try {
1390  SerializedCellsWriter writer(m_context.next_threshold);
1391  Hypertable::Cell cell;
1392 
1393  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1394 
1395  while (1) {
1396  if (scanner->next(cell)) {
1397  if (!writer.add(cell)) {
1399  scanner->unget(cell);
1400  break;
1401  }
1402  }
1403  else {
1405  break;
1406  }
1407  }
1408 
1409  result = String((char *)writer.get_buffer(), writer.get_buffer_length());
1410  } RETHROW("scanner="<< scanner_id);
1411  LOG_API_FINISH_E("result.size="<< result.size());
1412  }
1413 
1414  void next_cells_serialized(CellsSerialized &result,
1415  const Scanner scanner_id) override {
1416  scanner_get_cells_serialized(result, scanner_id);
1417  }
1418 
1419  void scanner_get_row(ThriftCells &result, const Scanner scanner_id) override {
1420  LOG_API_START("scanner="<< scanner_id <<" result.size="<< result.size());
1421  try {
1422  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1423  _next_row(result, scanner);
1424  } RETHROW("scanner=" << scanner_id)
1425 
1426  LOG_API_FINISH_E(" result.size="<< result.size());
1427  }
1428 
1429  void next_row(ThriftCells &result, const Scanner scanner_id) override {
1430  scanner_get_row(result, scanner_id);
1431  }
1432 
1433  void scanner_get_row_as_arrays(ThriftCellsAsArrays &result,
1434  const Scanner scanner_id) override {
1435  LOG_API_START("scanner="<< scanner_id);
1436  try {
1437  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1438  _next_row(result, scanner);
1439  } RETHROW("result.size=" << result.size())
1440  LOG_API_FINISH_E(" result.size="<< result.size());
1441  }
1442 
1443  void next_row_as_arrays(ThriftCellsAsArrays &result,
1444  const Scanner scanner_id) override {
1445  scanner_get_row_as_arrays(result, scanner_id);
1446  }
1447 
1448  void scanner_get_row_serialized(CellsSerialized &result,
1449  const Scanner scanner_id) override {
1450  LOG_API_START("scanner="<< scanner_id);
1451 
1452  try {
1453  SerializedCellsWriter writer(0, true);
1454  Hypertable::Cell cell;
1455  std::string prev_row;
1456 
1457  TableScanner *scanner = get_scanner(scanner_id, scanner_info);
1458 
1459  while (1) {
1460  if (scanner->next(cell)) {
1461  // keep scanning
1462  if (prev_row.empty() || prev_row == cell.row_key) {
1463  // add cells from this row
1464  writer.add(cell);
1465  if (prev_row.empty())
1466  prev_row = cell.row_key;
1467  }
1468  else {
1469  // done with this row
1471  scanner->unget(cell);
1472  break;
1473  }
1474  }
1475  else {
1476  // done with this scan
1478  break;
1479  }
1480  }
1481 
1482  result = String((char *)writer.get_buffer(), writer.get_buffer_length());
1483  } RETHROW("scanner="<< scanner_id)
1484  LOG_API_FINISH_E(" result.size="<< result.size());
1485  }
1486 
1487  void next_row_serialized(CellsSerialized& result,
1488  const Scanner scanner_id) override {
1489  scanner_get_row_serialized(result, scanner_id);
1490  }
1491 
1492  void get_row(ThriftCells &result, const ThriftGen::Namespace ns,
1493  const String &table, const String &row) override {
1494  LOG_API_START("namespace=" << ns << " table="<< table <<" row="<< row);
1495  try {
1496  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1497  TablePtr t = namespace_ptr->open_table(table);
1499  ss.row_intervals.push_back(Hypertable::RowInterval(row.c_str(), true,
1500  row.c_str(), true));
1501  ss.max_versions = 1;
1502  TableScannerPtr scanner(t->create_scanner(ss));
1503  _next(result, scanner.get(), INT32_MAX);
1504  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, ss);
1505  } RETHROW("namespace=" << ns << " table="<< table <<" row="<< row)
1506  LOG_API_FINISH_E(" result.size="<< result.size());
1507  }
1508 
1509  void get_row_as_arrays(ThriftCellsAsArrays &result,
1510  const ThriftGen::Namespace ns, const String &table,
1511  const String &row) override {
1512  LOG_API_START("namespace=" << ns << " table="<< table <<" row="<< row);
1513 
1514  try {
1515  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1516  TablePtr t = namespace_ptr->open_table(table);
1518  ss.row_intervals.push_back(Hypertable::RowInterval(row.c_str(), true,
1519  row.c_str(), true));
1520  ss.max_versions = 1;
1521  TableScannerPtr scanner(t->create_scanner(ss));
1522  _next(result, scanner.get(), INT32_MAX);
1523  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, ss);
1524  } RETHROW("namespace=" << ns << " table="<< table <<" row="<< row)
1525  LOG_API_FINISH_E("result.size="<< result.size());
1526  }
1527 
1528  void get_row_serialized(CellsSerialized &result,
1529  const ThriftGen::Namespace ns, const std::string& table,
1530  const std::string& row) override {
1531  LOG_API_START("namespace=" << ns << " table="<< table <<" row"<< row);
1532 
1533  try {
1534  SerializedCellsWriter writer(0, true);
1535  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1536  TablePtr t = namespace_ptr->open_table(table);
1538  ss.row_intervals.push_back(Hypertable::RowInterval(row.c_str(), true,
1539  row.c_str(), true));
1540  ss.max_versions = 1;
1541  TableScannerPtr scanner(t->create_scanner(ss));
1542  Hypertable::Cell cell;
1543 
1544  while (scanner->next(cell))
1545  writer.add(cell);
1547 
1548  result = String((char *)writer.get_buffer(), writer.get_buffer_length());
1549  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, ss);
1550  } RETHROW("namespace=" << ns << " table="<< table <<" row"<< row)
1551  LOG_API_FINISH_E(" result.size="<< result.size());
1552  }
1553 
1554  void get_cell(Value &result, const ThriftGen::Namespace ns,
1555  const String &table, const String &row, const String &column) override {
1556  LOG_API_START("namespace=" << ns << " table=" << table << " row="
1557  << row << " column=" << column);
1558 
1559  if (row.empty())
1560  HT_THROW(Error::BAD_KEY, "Empty row key");
1561 
1562  try {
1564  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1565  TablePtr t = namespace_ptr->open_table(table);
1566 
1567  ss.cell_intervals.push_back(Hypertable::CellInterval(row.c_str(),
1568  column.c_str(), true, row.c_str(), column.c_str(), true));
1569  ss.max_versions = 1;
1570 
1571  Hypertable::Cell cell;
1572  TableScannerPtr scanner(t->create_scanner(ss, 0, true));
1573 
1574  if (scanner->next(cell))
1575  result = String((char *)cell.value, cell.value_len);
1576 
1577  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, ss);
1578 
1579  } RETHROW("namespace=" << ns << " table=" << table << " row="
1580  << row << " column=" << column)
1581 
1582  LOG_API_FINISH_E(" result=" << result);
1583  }
1584 
1585  void get_cells(ThriftCells &result, const ThriftGen::Namespace ns,
1586  const String &table, const ThriftGen::ScanSpec &ss) override {
1587  LOG_API_START("namespace=" << ns << " table=" << table << " scan_spec="
1588  << ss << " result.size=" << result.size());
1589 
1590  try {
1592  convert_scan_spec(ss, hss);
1593  TableScannerPtr scanner(_open_scanner(ns, table, hss));
1594  _next(result, scanner.get(), INT32_MAX);
1595  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, hss);
1596  } RETHROW("namespace=" << ns << " table="<< table <<" scan_spec="<< ss)
1597  LOG_API_FINISH_E(" result.size="<< result.size());
1598  }
1599 
1600  void get_cells_as_arrays(ThriftCellsAsArrays &result,
1601  const ThriftGen::Namespace ns, const String &table,
1602  const ThriftGen::ScanSpec &ss) override {
1603  LOG_API_START("namespace=" << ns << " table="<< table <<" scan_spec="<< ss);
1604 
1605  try {
1607  convert_scan_spec(ss, hss);
1608  TableScannerPtr scanner(_open_scanner(ns, table, hss));
1609  _next(result, scanner.get(), INT32_MAX);
1610  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, hss);
1611  } RETHROW("namespace=" << ns << " table="<< table <<" scan_spec="<< ss)
1612  LOG_API_FINISH_E(" result.size="<< result.size());
1613  }
1614 
1615  void get_cells_serialized(CellsSerialized &result,
1616  const ThriftGen::Namespace ns, const String& table,
1617  const ThriftGen::ScanSpec& ss) override {
1618  LOG_API_START("namespace=" << ns << " table="<< table <<" scan_spec="<< ss);
1619 
1620  try {
1622  convert_scan_spec(ss, hss);
1623  SerializedCellsWriter writer(0, true);
1624  TableScannerPtr scanner(_open_scanner(ns, table, hss));
1625  Hypertable::Cell cell;
1626 
1627  while (scanner->next(cell))
1628  writer.add(cell);
1630 
1631  result = String((char *)writer.get_buffer(), writer.get_buffer_length());
1632  LOG_SLOW_QUERY_SCANNER(scanner, get_namespace(ns), table, hss);
1633  } RETHROW("namespace=" << ns << " table="<< table <<" scan_spec="<< ss)
1634  LOG_API_FINISH_E(" result.size="<< result.size());
1635  }
1636 
1637  void shared_mutator_set_cells(const ThriftGen::Namespace ns,
1638  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1639  const ThriftCells &cells) override {
1640  LOG_API_START("namespace=" << ns << " table=" << table <<
1641  " mutate_spec.appname=" << mutate_spec.appname);
1642 
1643  try {
1644  _offer_cells(ns, table, mutate_spec, cells);
1645  } RETHROW("namespace=" << ns << " table=" << table
1646  << " mutate_spec.appname="<< mutate_spec.appname)
1647  LOG_API_FINISH_E(" cells.size=" << cells.size());
1648  }
1649 
1650  void offer_cells(const ThriftGen::Namespace ns, const String &table,
1651  const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override {
1652  shared_mutator_set_cells(ns, table, mutate_spec, cells);
1653  }
1654 
1655  void shared_mutator_set_cell(const ThriftGen::Namespace ns,
1656  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1657  const ThriftGen::Cell &cell) override {
1658  LOG_API_START(" namespace=" << ns << " table=" << table
1659  << " mutate_spec.appname="<< mutate_spec.appname);
1660 
1661  try {
1662  _offer_cell(ns, table, mutate_spec, cell);
1663  } RETHROW("namespace=" << ns << " table=" << table
1664  << " mutate_spec.appname="<< mutate_spec.appname)
1665  LOG_API_FINISH_E(" cell="<< cell);
1666  }
1667 
1668  void offer_cell(const ThriftGen::Namespace ns, const String &table,
1669  const ThriftGen::MutateSpec &mutate_spec,
1670  const ThriftGen::Cell &cell) override {
1671  shared_mutator_set_cell(ns, table, mutate_spec, cell);
1672  }
1673 
1674  void shared_mutator_set_cells_as_arrays(const ThriftGen::Namespace ns,
1675  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1676  const ThriftCellsAsArrays &cells) override {
1677  LOG_API_START(" namespace=" << ns << " table=" << table
1678  << " mutate_spec.appname=" << mutate_spec.appname);
1679  try {
1680  _offer_cells(ns, table, mutate_spec, cells);
1681  LOG_API("mutate_spec.appname=" << mutate_spec.appname << " done");
1682  } RETHROW("namespace=" << ns << " table=" << table
1683  << " mutate_spec.appname=" << mutate_spec.appname)
1684  LOG_API_FINISH_E(" cells.size=" << cells.size());
1685  }
1686 
1687  void offer_cells_as_arrays(const ThriftGen::Namespace ns,
1688  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1689  const ThriftCellsAsArrays &cells) override {
1690  shared_mutator_set_cells_as_arrays(ns, table, mutate_spec, cells);
1691  }
1692 
1693  void shared_mutator_set_cell_as_array(const ThriftGen::Namespace ns,
1694  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1695  const CellAsArray &cell) override {
1696  // gcc 4.0.1 cannot seems to handle << cell here (see ThriftHelper.h)
1697  LOG_API_START("namespace=" << ns << " table=" << table
1698  << " mutate_spec.appname=" << mutate_spec.appname);
1699 
1700  try {
1701  _offer_cell(ns, table, mutate_spec, cell);
1702  LOG_API("mutate_spec.appname=" << mutate_spec.appname << " done");
1703  } RETHROW("namespace=" << ns << " table=" << table
1704  << " mutate_spec.appname=" << mutate_spec.appname)
1705  LOG_API_FINISH_E(" cell.size=" << cell.size());
1706  }
1707 
1708  void offer_cell_as_array(const ThriftGen::Namespace ns,
1709  const String &table, const ThriftGen::MutateSpec &mutate_spec,
1710  const CellAsArray &cell) override {
1711  shared_mutator_set_cell_as_array(ns, table, mutate_spec, cell);
1712  }
1713 
1714  ThriftGen::Future future_open(int capacity) override {
1715  ThriftGen::Future id;
1716  LOG_API_START("capacity=" << capacity);
1717  try {
1718  capacity = (capacity <= 0) ? m_context.future_capacity : capacity;
1719  id = get_object_id( new Hypertable::Future(capacity) );
1720  } RETHROW("capacity=" << capacity)
1721  LOG_API_FINISH_E(" future=" << id);
1722  return id;
1723  }
1724 
1725  ThriftGen::Future open_future(int capacity) override {
1726  return future_open(capacity);
1727  }
1728 
1729  void future_get_result(ThriftGen::Result &tresult,
1730  ThriftGen::Future ff, int timeout_millis) override {
1731  LOG_API_START("future=" << ff);
1732 
1733  try {
1734  Hypertable::Future *future = get_future(ff);
1735  ResultPtr hresult;
1736  bool timed_out = false;
1737  bool done = !(future->get(hresult, (uint32_t)timeout_millis,
1738  timed_out));
1739  if (timed_out)
1740  THROW_TE(Error::REQUEST_TIMEOUT, "Failed to fetch Future result");
1741  if (done) {
1742  tresult.is_empty = true;
1743  tresult.id = 0;
1744  LOG_API_FINISH_E(" is_empty="<< tresult.is_empty);
1745  }
1746  else {
1747  tresult.is_empty = false;
1748  _convert_result(hresult, tresult);
1749  LOG_API_FINISH_E(" is_empty=" << tresult.is_empty << " id="
1750  << tresult.id << " is_scan=" << tresult.is_scan
1751  << " is_error=" << tresult.is_error);
1752  }
1753  } RETHROW("future=" << ff)
1754  }
1755 
1756  void get_future_result(ThriftGen::Result &tresult,
1757  ThriftGen::Future ff, int timeout_millis) override {
1758  future_get_result(tresult, ff, timeout_millis);
1759  }
1760 
1761  void future_get_result_as_arrays(ThriftGen::ResultAsArrays &tresult,
1762  ThriftGen::Future ff, int timeout_millis) override {
1763  LOG_API_START("future=" << ff);
1764  try {
1765  Hypertable::Future *future = get_future(ff);
1766  ResultPtr hresult;
1767  bool timed_out = false;
1768  bool done = !(future->get(hresult, (uint32_t)timeout_millis,
1769  timed_out));
1770  if (timed_out)
1771  THROW_TE(Error::REQUEST_TIMEOUT, "Failed to fetch Future result");
1772  if (done) {
1773  tresult.is_empty = true;
1774  tresult.id = 0;
1775  LOG_API_FINISH_E(" done="<< done );
1776  }
1777  else {
1778  tresult.is_empty = false;
1779  _convert_result_as_arrays(hresult, tresult);
1780  LOG_API_FINISH_E(" done=" << done << " id=" << tresult.id
1781  << " is_scan=" << tresult.is_scan << "is_error="
1782  << tresult.is_error);
1783  }
1784  } RETHROW("future=" << ff)
1785  }
1786 
1787  void get_future_result_as_arrays(ThriftGen::ResultAsArrays &tresult,
1788  ThriftGen::Future ff, int timeout_millis) override {
1789  future_get_result_as_arrays(tresult, ff, timeout_millis);
1790  }
1791 
1792  void future_get_result_serialized(ThriftGen::ResultSerialized &tresult,
1793  ThriftGen::Future ff, int timeout_millis) override {
1794  LOG_API_START("future=" << ff);
1795 
1796  try {
1797  Hypertable::Future *future = get_future(ff);
1798  ResultPtr hresult;
1799  bool timed_out = false;
1800  bool done = !(future->get(hresult, (uint32_t)timeout_millis,
1801  timed_out));
1802  if (timed_out)
1803  THROW_TE(Error::REQUEST_TIMEOUT, "Failed to fetch Future result");
1804  if (done) {
1805  tresult.is_empty = true;
1806  tresult.id = 0;
1807  LOG_API_FINISH_E(" done="<< done );
1808  }
1809  else {
1810  tresult.is_empty = false;
1811  _convert_result_serialized(hresult, tresult);
1812  LOG_API_FINISH_E(" done=" << done << " id=" << tresult.id
1813  << " is_scan=" << tresult.is_scan << "is_error="
1814  << tresult.is_error);
1815  }
1816  } RETHROW("future=" << ff)
1817  }
1818 
1819  void get_future_result_serialized(ThriftGen::ResultSerialized &tresult,
1820  ThriftGen::Future ff, int timeout_millis) override {
1821  future_get_result_serialized(tresult, ff, timeout_millis);
1822  }
1823 
1824  void future_cancel(ThriftGen::Future ff) override {
1825  LOG_API_START("future=" << ff);
1826 
1827  try {
1828  Hypertable::Future *future = get_future(ff);
1829  future->cancel();
1830  } RETHROW("future=" << ff)
1832  }
1833 
1834  void cancel_future(ThriftGen::Future ff) override {
1835  future_cancel(ff);
1836  }
1837 
1838  bool future_is_empty(ThriftGen::Future ff) override {
1839  LOG_API_START("future=" << ff);
1840  bool is_empty;
1841  try {
1842  Hypertable::Future *future = get_future(ff);
1843  is_empty = future->is_empty();
1844  } RETHROW("future=" << ff)
1845  LOG_API_FINISH_E(" is_empty=" << is_empty);
1846  return is_empty;
1847  }
1848 
1849  bool future_is_full(ThriftGen::Future ff) override {
1850  LOG_API_START("future=" << ff);
1851  bool full;
1852  try {
1853  Hypertable::Future *future = get_future(ff);
1854  full = future->is_full();
1855  } RETHROW("future=" << ff)
1856  LOG_API_FINISH_E(" full=" << full);
1857  return full;
1858  }
1859 
1860  bool future_is_cancelled(ThriftGen::Future ff) override {
1861  LOG_API_START("future=" << ff);
1862  bool cancelled;
1863  try {
1864  Hypertable::Future *future = get_future(ff);
1865  cancelled = future->is_cancelled();
1866  } RETHROW("future=" << ff)
1867  LOG_API_FINISH_E(" cancelled=" << cancelled);
1868  return cancelled;
1869  }
1870 
1871  bool future_has_outstanding(ThriftGen::Future ff) override {
1872  bool has_outstanding;
1873  LOG_API_START("future=" << ff);
1874  try {
1875  Hypertable::Future *future = get_future(ff);
1876  has_outstanding = future->has_outstanding();
1877  } RETHROW("future=" << ff)
1878  LOG_API_FINISH_E(" has_outstanding=" << has_outstanding);
1879  return has_outstanding;
1880  }
1881 
1882  void future_close(const ThriftGen::Future ff) override {
1883  LOG_API_START("future="<< ff);
1884  try {
1885  remove_future(ff);
1886  } RETHROW("future=" << ff)
1888  }
1889 
1890  void close_future(const ThriftGen::Future ff) override {
1891  future_close(ff);
1892  }
1893 
1894  ThriftGen::Namespace namespace_open(const String &ns) override {
1895  ThriftGen::Namespace id;
1896  LOG_API_START("namespace =" << ns);
1897  try {
1898  id = get_cached_object_id( dynamic_pointer_cast<ClientObject>(m_context.client->open_namespace(ns)) );
1899  } RETHROW("namespace " << ns)
1900  LOG_API_FINISH_E(" id=" << id);
1901  return id;
1902  }
1903 
1904  ThriftGen::Namespace open_namespace(const String &ns) override {
1905  return namespace_open(ns);
1906  }
1907 
1908  MutatorAsync async_mutator_open(const ThriftGen::Namespace ns,
1909  const String &table, const ThriftGen::Future ff, ::int32_t flags) override {
1910  LOG_API_START("namespace=" << ns << " table=" << table << " future="
1911  << ff << " flags=" << flags);
1912  MutatorAsync id;
1913  try {
1914  id = get_object_id(_open_mutator_async(ns, table, ff, flags));
1915  add_reference(id, ff);
1916  } RETHROW("namespace=" << ns << " table=" << table << " future="
1917  << ff << " flags=" << flags)
1918  LOG_API_FINISH_E(" mutator=" << id);
1919  return id;
1920  }
1921 
1922  MutatorAsync open_mutator_async(const ThriftGen::Namespace ns,
1923  const String &table, const ThriftGen::Future ff, ::int32_t flags) override {
1924  return async_mutator_open(ns, table, ff, flags);
1925  }
1926 
1927  Mutator mutator_open(const ThriftGen::Namespace ns,
1928  const String &table, int32_t flags, int32_t flush_interval) override {
1929  LOG_API_START("namespace=" << ns << "table=" << table << " flags="
1930  << flags << " flush_interval=" << flush_interval);
1931  Mutator id;
1932  try {
1933  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
1934  TablePtr t = namespace_ptr->open_table(table);
1935  id = get_object_id(t->create_mutator(0, flags, flush_interval));
1936  } RETHROW("namespace=" << ns << "table=" << table << " flags="
1937  << flags << " flush_interval=" << flush_interval)
1938  LOG_API_FINISH_E(" async_mutator=" << id);
1939  return id;
1940  }
1941 
1942  Mutator open_mutator(const ThriftGen::Namespace ns,
1943  const String &table, int32_t flags, int32_t flush_interval) override {
1944  return mutator_open(ns, table, flags, flush_interval);
1945  }
1946 
1947  void mutator_flush(const Mutator mutator) override {
1948  LOG_API_START("mutator="<< mutator);
1949  try {
1950  get_mutator(mutator)->flush();
1951  } RETHROW("mutator=" << mutator)
1952  LOG_API_FINISH_E(" done");
1953  }
1954 
1955  void flush_mutator(const Mutator mutator) override {
1956  mutator_flush(mutator);
1957  }
1958 
1959  void async_mutator_flush(const MutatorAsync mutator) override {
1960  LOG_API_START("mutator="<< mutator);
1961  try {
1962  get_mutator_async(mutator)->flush();
1963  } RETHROW("mutator=" << mutator)
1964  LOG_API_FINISH_E(" done");
1965  }
1966 
1967  void flush_mutator_async(const MutatorAsync mutator) override {
1968  async_mutator_flush(mutator);
1969  }
1970 
1971  void mutator_close(const Mutator mutator) override {
1972  LOG_API_START("mutator="<< mutator);
1973  try {
1974  flush_mutator(mutator);
1975  remove_mutator(mutator);
1976  } RETHROW("mutator=" << mutator)
1978  }
1979 
1980  void close_mutator(const Mutator mutator) override {
1981  mutator_close(mutator);
1982  }
1983 
1984  void async_mutator_cancel(const MutatorAsync mutator) override {
1985  LOG_API_START("mutator="<< mutator);
1986 
1987  try {
1988  get_mutator_async(mutator)->cancel();
1989  } RETHROW("mutator="<< mutator)
1990 
1991  LOG_API_FINISH_E(" cancelled");
1992  }
1993 
1994  void cancel_mutator_async(const MutatorAsync mutator) override {
1995  async_mutator_cancel(mutator);
1996  }
1997 
1998  void async_mutator_close(const MutatorAsync mutator) override {
1999  LOG_API_START("mutator="<< mutator);
2000  try {
2001  flush_mutator_async(mutator);
2002  remove_mutator(mutator);
2003  remove_references(mutator);
2004  } RETHROW("mutator" << mutator)
2006  }
2007 
2008  void close_mutator_async(const MutatorAsync mutator) override {
2009  async_mutator_close(mutator);
2010  }
2011 
2012  void mutator_set_cells(const Mutator mutator,
2013  const ThriftCells &cells) override {
2014  LOG_API_START("mutator=" << mutator << " cell.size=" << cells.size());
2015  try {
2016  _set_cells(mutator, cells);
2017  } RETHROW("mutator=" << mutator << " cell.size=" << cells.size())
2019  }
2020 
2021  void mutator_set_cell(const Mutator mutator,
2022  const ThriftGen::Cell &cell) override {
2023  LOG_API_START("mutator=" << mutator << " cell=" << cell);
2024  try {
2025  _set_cell(mutator, cell);
2026  } RETHROW("mutator=" << mutator << " cell=" << cell)
2028  }
2029 
2030  void mutator_set_cells_as_arrays(const Mutator mutator,
2031  const ThriftCellsAsArrays &cells) override {
2032  LOG_API_START("mutator=" << mutator << " cell.size=" << cells.size());
2033  try {
2034  _set_cells(mutator, cells);
2035  } RETHROW("mutator=" << mutator << " cell.size=" << cells.size())
2037  }
2038 
2039  void mutator_set_cell_as_array(const Mutator mutator,
2040  const CellAsArray &cell) override {
2041  // gcc 4.0.1 cannot seems to handle << cell here (see ThriftHelper.h)
2042  LOG_API_START("mutator=" << mutator << " cell_as_array.size="
2043  << cell.size());
2044  try {
2045  _set_cell(mutator, cell);
2046  } RETHROW("mutator="<< mutator <<" cell_as_array.size="<< cell.size());
2048  }
2049 
2050  void mutator_set_cells_serialized(const Mutator mutator,
2051  const CellsSerialized &cells, const bool flush) override {
2052  LOG_API_START("mutator=" << mutator << " cell.size=" << cells.size());
2053  try {
2054  CellsBuilder cb;
2055  Hypertable::Cell hcell;
2056  SerializedCellsReader reader((void *)cells.c_str(),
2057  (uint32_t)cells.length());
2058  while (reader.next()) {
2059  reader.get(hcell);
2060  cb.add(hcell, false);
2061  }
2062  get_mutator(mutator)->set_cells(cb.get());
2063  if (flush || reader.flush())
2064  get_mutator(mutator)->flush();
2065  } RETHROW("mutator="<< mutator <<" cell.size="<< cells.size())
2066 
2068  }
2069 
2070  void set_cell(const ThriftGen::Namespace ns, const String& table,
2071  const ThriftGen::Cell &cell) override {
2072  LOG_API_START("ns=" << ns << " table=" << table << " cell=" << cell);
2073  try {
2074  TableMutatorPtr mutator(_open_mutator(ns, table));
2075  CellsBuilder cb;
2076  Hypertable::Cell hcell;
2077  convert_cell(cell, hcell);
2078  cb.add(hcell, false);
2079  mutator->set_cells(cb.get());
2080  mutator->flush();
2081  } RETHROW("ns=" << ns << " table=" << table << " cell=" << cell);
2083  }
2084 
2085  void set_cells(const ThriftGen::Namespace ns, const String& table,
2086  const ThriftCells &cells) override {
2087  LOG_API_START("ns=" << ns << " table=" << table << " cell.size="
2088  << cells.size());
2089  try {
2090  TableMutatorPtr mutator(_open_mutator(ns, table));
2091  Hypertable::Cells hcells;
2092  convert_cells(cells, hcells);
2093  mutator->set_cells(hcells);
2094  mutator->flush();
2095  } RETHROW("ns=" << ns << " table=" << table <<" cell.size="
2096  << cells.size());
2098  }
2099 
2100  void set_cells_as_arrays(const ThriftGen::Namespace ns,
2101  const String& table, const ThriftCellsAsArrays &cells) override {
2102  LOG_API_START("ns=" << ns << " table=" << table << " cell.size="
2103  << cells.size());
2104 
2105  try {
2106  TableMutatorPtr mutator(_open_mutator(ns, table));
2107  Hypertable::Cells hcells;
2108  convert_cells(cells, hcells);
2109  mutator->set_cells(hcells);
2110  mutator->flush();
2111  } RETHROW("ns="<< ns <<" table=" << table<<" cell.size="<< cells.size());
2113  }
2114 
2115  void set_cell_as_array(const ThriftGen::Namespace ns,
2116  const String& table, const CellAsArray &cell) override {
2117  // gcc 4.0.1 cannot seems to handle << cell here (see ThriftHelper.h)
2118 
2119  LOG_API_START("ns=" << ns << " table=" << table << " cell_as_array.size="
2120  << cell.size());
2121  try {
2122  TableMutatorPtr mutator(_open_mutator(ns, table));
2123  CellsBuilder cb;
2124  Hypertable::Cell hcell;
2125  convert_cell(cell, hcell);
2126  cb.add(hcell, false);
2127  mutator->set_cells(cb.get());
2128  mutator->flush();
2129  } RETHROW("ns=" << ns << " table=" << table << " cell_as_array.size="
2130  << cell.size());
2132  }
2133 
2134  void set_cells_serialized(const ThriftGen::Namespace ns,
2135  const String& table, const CellsSerialized &cells) override {
2136  LOG_API_START("ns=" << ns << " table=" << table <<
2137  " cell_serialized.size=" << cells.size() << " flush=" << flush);
2138  try {
2139  TableMutatorPtr mutator(_open_mutator(ns, table));
2140  CellsBuilder cb;
2141  Hypertable::Cell hcell;
2142  SerializedCellsReader reader((void *)cells.c_str(),
2143  (uint32_t)cells.length());
2144  while (reader.next()) {
2145  reader.get(hcell);
2146  cb.add(hcell, false);
2147  }
2148  mutator->set_cells(cb.get());
2149  mutator->flush();
2150  } RETHROW("ns=" << ns << " table=" << table << " cell_serialized.size="
2151  << cells.size() << " flush=" << flush);
2152 
2154  }
2155 
2156  void async_mutator_set_cells(const MutatorAsync mutator,
2157  const ThriftCells &cells) override {
2158  LOG_API_START("mutator=" << mutator << " cells.size=" << cells.size());
2159  try {
2160  _set_cells_async(mutator, cells);
2161  } RETHROW("mutator=" << mutator << " cells.size=" << cells.size())
2163  }
2164 
2165  void set_cells_async(const MutatorAsync mutator,
2166  const ThriftCells &cells) override {
2167  async_mutator_set_cells(mutator, cells);
2168  }
2169 
2170  void async_mutator_set_cell(const MutatorAsync mutator,
2171  const ThriftGen::Cell &cell) override {
2172  LOG_API_START("mutator=" << mutator <<" cell=" << cell);
2173  try {
2174  _set_cell_async(mutator, cell);
2175  } RETHROW("mutator=" << mutator << " cell=" << cell);
2177  }
2178 
2179  void set_cell_async(const MutatorAsync mutator,
2180  const ThriftGen::Cell &cell) override {
2181  async_mutator_set_cell(mutator, cell);
2182  }
2183 
2184  void async_mutator_set_cells_as_arrays(const MutatorAsync mutator,
2185  const ThriftCellsAsArrays &cells) override {
2186  LOG_API_START("mutator=" << mutator << " cells.size=" << cells.size());
2187  try {
2188  _set_cells_async(mutator, cells);
2189  } RETHROW("mutator=" << mutator << " cells.size=" << cells.size())
2191  }
2192 
2193  void set_cells_as_arrays_async(const MutatorAsync mutator,
2194  const ThriftCellsAsArrays &cells) override {
2195  async_mutator_set_cells_as_arrays(mutator, cells);
2196  }
2197 
2198  void async_mutator_set_cell_as_array(const MutatorAsync mutator,
2199  const CellAsArray &cell) override {
2200  // gcc 4.0.1 cannot seems to handle << cell here (see ThriftHelper.h)
2201  LOG_API_START("mutator=" << mutator << " cell_as_array.size="
2202  << cell.size());
2203  try {
2204  _set_cell_async(mutator, cell);
2205  } RETHROW("mutator=" << mutator << " cell_as_array.size=" << cell.size())
2207  }
2208 
2209  void set_cell_as_array_async(const MutatorAsync mutator,
2210  const CellAsArray &cell) override {
2211  async_mutator_set_cell_as_array(mutator, cell);
2212  }
2213 
2214  void async_mutator_set_cells_serialized(const MutatorAsync mutator,
2215  const CellsSerialized &cells,
2216  const bool flush) override {
2217  LOG_API_START("mutator=" << mutator << " cells.size=" << cells.size());
2218  try {
2219  CellsBuilder cb;
2220  Hypertable::Cell hcell;
2221  SerializedCellsReader reader((void *)cells.c_str(),
2222  (uint32_t)cells.length());
2223  while (reader.next()) {
2224  reader.get(hcell);
2225  cb.add(hcell, false);
2226  }
2227  TableMutatorAsync *mutator_ptr = get_mutator_async(mutator);
2228  mutator_ptr->set_cells(cb.get());
2229  if (flush || reader.flush() || mutator_ptr->needs_flush())
2230  mutator_ptr->flush();
2231 
2232  } RETHROW("mutator=" << mutator << " cells.size=" << cells.size());
2234  }
2235 
2236  void set_cells_serialized_async(const MutatorAsync mutator,
2237  const CellsSerialized &cells,
2238  const bool flush) override {
2239  async_mutator_set_cells_serialized(mutator, cells, flush);
2240  }
2241 
2242  bool namespace_exists(const String &ns) override {
2243  bool exists;
2244  LOG_API_START("namespace=" << ns);
2245  try {
2246  exists = m_context.client->exists_namespace(ns);
2247  } RETHROW("namespace=" << ns)
2248  LOG_API_FINISH_E(" exists=" << exists);
2249  return exists;
2250  }
2251 
2252  bool exists_namespace(const String &ns) override {
2253  return namespace_exists(ns);
2254  }
2255 
2256  bool table_exists(const ThriftGen::Namespace ns,
2257  const String &table) override {
2258  LOG_API_START("namespace=" << ns << " table=" << table);
2259  bool exists;
2260  try {
2261  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2262  exists = namespace_ptr->exists_table(table);
2263  } RETHROW("namespace=" << ns << " table="<< table)
2264  LOG_API_FINISH_E(" exists=" << exists);
2265  return exists;
2266  }
2267 
2268  bool exists_table(const ThriftGen::Namespace ns,
2269  const String &table) override {
2270  return table_exists(ns, table);
2271  }
2272 
2273  void table_get_id(String &result, const ThriftGen::Namespace ns,
2274  const String &table) override {
2275  LOG_API_START("namespace=" << ns << " table=" << table);
2276  try {
2277  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2278  result = namespace_ptr->get_table_id(table);
2279  } RETHROW("namespace=" << ns << " table="<< table)
2280  LOG_API_FINISH_E(" id=" << result);
2281  }
2282 
2283  void get_table_id(String &result, const ThriftGen::Namespace ns,
2284  const String &table) override {
2285  table_get_id(result, ns, table);
2286  }
2287 
2289  const ThriftGen::Namespace ns, const String &table) override {
2290  LOG_API_START("namespace=" << ns << " table=" << table);
2291  try {
2292  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2293  result = namespace_ptr->get_schema_str(table);
2294  } RETHROW("namespace=" << ns << " table=" << table)
2295  LOG_API_FINISH_E(" schema=" << result);
2296  }
2297 
2298  void get_schema_str(String &result, const ThriftGen::Namespace ns,
2299  const String &table) override {
2300  table_get_schema_str(result, ns, table);
2301  }
2302 
2304  const ThriftGen::Namespace ns, const String &table) override {
2305  LOG_API_START("namespace=" << ns << " table=" << table);
2306  try {
2307  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2308  result = namespace_ptr->get_schema_str(table, true);
2309  } RETHROW("namespace=" << ns << " table=" << table)
2310  LOG_API_FINISH_E(" schema=" << result);
2311  }
2312 
2314  const ThriftGen::Namespace ns, const String &table) override {
2315  table_get_schema_str_with_ids(result, ns, table);
2316  }
2317 
2318  void table_get_schema(ThriftGen::Schema &result,
2319  const ThriftGen::Namespace ns, const String &table) override {
2320  LOG_API_START("namespace=" << ns << " table=" << table);
2321  try {
2322  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2323  Hypertable::SchemaPtr schema = namespace_ptr->get_schema(table);
2324  if (schema)
2325  convert_schema(schema, result);
2326  } RETHROW("namespace=" << ns << " table="<< table)
2328  }
2329 
2330  void get_schema(ThriftGen::Schema &result,
2331  const ThriftGen::Namespace ns, const String &table) override {
2332  table_get_schema(result, ns, table);
2333  }
2334 
2335  void get_tables(std::vector<String> &tables,
2336  const ThriftGen::Namespace ns) override {
2337  LOG_API_START("namespace=" << ns);
2338  try {
2339  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2340  std::vector<Hypertable::NamespaceListing> listing;
2341  namespace_ptr->get_listing(false, listing);
2342 
2343  for(size_t ii=0; ii < listing.size(); ++ii)
2344  if (!listing[ii].is_namespace)
2345  tables.push_back(listing[ii].name);
2346 
2347  }
2348  RETHROW("namespace=" << ns)
2349  LOG_API_FINISH_E(" tables.size=" << tables.size());
2350  }
2351 
2352  void namespace_get_listing(std::vector<ThriftGen::NamespaceListing>& _return,
2353  const ThriftGen::Namespace ns) override {
2354  LOG_API_START("namespace=" << ns);
2355  try {
2356  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2357  std::vector<Hypertable::NamespaceListing> listing;
2358  namespace_ptr->get_listing(false, listing);
2359  ThriftGen::NamespaceListing entry;
2360 
2361  for(size_t ii=0; ii < listing.size(); ++ii) {
2362  entry.name = listing[ii].name;
2363  entry.is_namespace = listing[ii].is_namespace;
2364  _return.push_back(entry);
2365  }
2366  }
2367  RETHROW("namespace=" << ns)
2368 
2369  LOG_API_FINISH_E(" listing.size=" << _return.size());
2370  }
2371 
2372  void get_listing(std::vector<ThriftGen::NamespaceListing>& _return,
2373  const ThriftGen::Namespace ns) override {
2374  namespace_get_listing(_return, ns);
2375  }
2376 
2377  void table_get_splits(std::vector<ThriftGen::TableSplit> & _return,
2378  const ThriftGen::Namespace ns, const String &table) override {
2379  TableSplitsContainer splits;
2380  LOG_API_START("namespace=" << ns << " table=" << table
2381  << " splits.size=" << _return.size());
2382  try {
2383  ThriftGen::TableSplit tsplit;
2384  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2385  namespace_ptr->get_table_splits(table, splits);
2386  for (TableSplitsContainer::iterator iter = splits.begin();
2387  iter != splits.end(); ++iter) {
2388  convert_table_split(*iter, tsplit);
2389  _return.push_back(tsplit);
2390  }
2391  }
2392  RETHROW("namespace=" << ns << " table=" << table)
2393 
2394  LOG_API_FINISH_E(" splits.size=" << splits.size());
2395  }
2396 
2397  void get_table_splits(std::vector<ThriftGen::TableSplit> & _return,
2398  const ThriftGen::Namespace ns, const String &table) override {
2399  table_get_splits(_return, ns, table);
2400  }
2401 
2402  void namespace_drop(const String &ns, const bool if_exists) override {
2403  LOG_API_START("namespace=" << ns << " if_exists=" << if_exists);
2404  try {
2405  m_context.client->drop_namespace(ns, NULL, if_exists);
2406  }
2407  RETHROW("namespace=" << ns << " if_exists=" << if_exists)
2409  }
2410 
2411  void drop_namespace(const String &ns, const bool if_exists) override {
2412  namespace_drop(ns, if_exists);
2413  }
2414 
2415  void table_rename(const ThriftGen::Namespace ns, const String &table,
2416  const String &new_table_name) override {
2417  LOG_API_START("namespace=" << ns << " table=" << table
2418  << " new_table_name=" << new_table_name << " done");
2419  try {
2420  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2421  namespace_ptr->rename_table(table, new_table_name);
2422  }
2423  RETHROW("namespace=" << ns << " table=" << table << " new_table_name="
2424  << new_table_name << " done")
2426  }
2427 
2428  void rename_table(const ThriftGen::Namespace ns, const String &table,
2429  const String &new_table_name) override {
2430  table_rename(ns, table, new_table_name);
2431  }
2432 
2433  void table_drop(const ThriftGen::Namespace ns, const String &table,
2434  const bool if_exists) override {
2435  LOG_API_START("namespace=" << ns << " table=" << table << " if_exists="
2436  << if_exists);
2437  try {
2438  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2439  namespace_ptr->drop_table(table, if_exists);
2440  }
2441  RETHROW("namespace=" << ns << " table=" << table << " if_exists="
2442  << if_exists)
2444  }
2445 
2446  void drop_table(const ThriftGen::Namespace ns, const String &table,
2447  const bool if_exists) override {
2448  table_drop(ns, table, if_exists);
2449  }
2450 
2451  void generate_guid(std::string& _return) override {
2452  LOG_API_START("");
2453  try {
2454  _return = HyperAppHelper::generate_guid();
2455  }
2456  RETHROW("")
2458  }
2459 
2460  void create_cell_unique(std::string &_return,
2461  const ThriftGen::Namespace ns, const std::string& table_name,
2462  const ThriftGen::Key& tkey, const std::string& value) override {
2463  LOG_API_START("namespace=" << ns << " table=" << table_name
2464  << tkey << " value=" << value);
2465  std::string guid;
2466  try {
2467  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2468  Hypertable::KeySpec hkey;
2469  convert_key(tkey, hkey);
2470  TablePtr t = namespace_ptr->open_table(table_name);
2472  value.empty() ? guid : (std::string &)value);
2473  }
2474  RETHROW("namespace=" << ns << " table=" << table_name
2475  << tkey << " value=" << value);
2477  _return = value.empty() ? guid : value;
2478  }
2479 
2480  void status(ThriftGen::Status& _return) override {
2481  try {
2482  _return.__set_code(0);
2483  _return.__set_text("");
2484  }
2485  RETHROW("");
2486  }
2487 
2488  void shutdown() override {
2489  kill(getpid(), SIGKILL);
2490  }
2491 
2492  void error_get_text(std::string &_return, int error_code) override {
2493  LOG_API_START("error_code=" << error_code);
2494  _return = HyperAppHelper::error_get_text(error_code);
2496  }
2497 
2498  // helper methods
2500  ThriftGen::Result &tresult) {
2501  Hypertable::Cells hcells;
2502 
2503  if (hresult->is_scan()) {
2504  tresult.is_scan = true;
2505  tresult.id = try_get_object_id(hresult->get_scanner());
2506  if (hresult->is_error()) {
2507  tresult.is_error = true;
2508  hresult->get_error(tresult.error, tresult.error_msg);
2509  tresult.__isset.error = true;
2510  tresult.__isset.error_msg = true;
2511  }
2512  else {
2513  tresult.is_error = false;
2514  tresult.__isset.cells = true;
2515  hresult->get_cells(hcells);
2516  convert_cells(hcells, tresult.cells);
2517  }
2518  }
2519  else {
2520  tresult.is_scan = false;
2521  tresult.id = try_get_object_id(hresult->get_mutator());
2522  if (hresult->is_error()) {
2523  tresult.is_error = true;
2524  hresult->get_error(tresult.error, tresult.error_msg);
2525  hresult->get_failed_cells(hcells);
2526  convert_cells(hcells, tresult.cells);
2527  tresult.__isset.error = true;
2528  tresult.__isset.error_msg = true;
2529  }
2530  }
2531  }
2532 
2534  ThriftGen::ResultAsArrays &tresult) {
2535  Hypertable::Cells hcells;
2536 
2537  if (hresult->is_scan()) {
2538  tresult.is_scan = true;
2539  tresult.id = try_get_object_id(hresult->get_scanner());
2540  if (hresult->is_error()) {
2541  tresult.is_error = true;
2542  hresult->get_error(tresult.error, tresult.error_msg);
2543  tresult.__isset.error = true;
2544  tresult.__isset.error_msg = true;
2545  }
2546  else {
2547  tresult.is_error = false;
2548  tresult.__isset.cells = true;
2549  hresult->get_cells(hcells);
2550  convert_cells(hcells, tresult.cells);
2551  }
2552  }
2553  else {
2554  HT_THROW(Error::NOT_IMPLEMENTED, "Support for asynchronous mutators "
2555  "not yet implemented");
2556  }
2557  }
2558 
2560  ThriftGen::ResultSerialized &tresult) {
2561  Hypertable::Cells hcells;
2562 
2563  if (hresult->is_scan()) {
2564  tresult.is_scan = true;
2565  tresult.id = try_get_object_id(hresult->get_scanner());
2566  if (hresult->is_error()) {
2567  tresult.is_error = true;
2568  hresult->get_error(tresult.error, tresult.error_msg);
2569  tresult.__isset.error = true;
2570  tresult.__isset.error_msg = true;
2571  }
2572  else {
2573  tresult.is_error = false;
2574  tresult.__isset.cells = true;
2575  hresult->get_cells(hcells);
2576  convert_cells(hcells, tresult.cells);
2577  }
2578  }
2579  else {
2580  HT_THROW(Error::NOT_IMPLEMENTED, "Support for asynchronous mutators "
2581  "not yet implemented");
2582  }
2583  }
2584 
2585  TableMutatorAsync *_open_mutator_async(const ThriftGen::Namespace ns,
2586  const String &table, const ThriftGen::Future ff, ::int32_t flags) {
2587  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2588  TablePtr t = namespace_ptr->open_table(table);
2589  Hypertable::Future *future = get_future(ff);
2590 
2591  return t->create_mutator_async(future, 0, flags);
2592  }
2593 
2594  TableScannerAsync *_open_scanner_async(const ThriftGen::Namespace ns,
2595  const String &table, const ThriftGen::Future ff,
2596  const ThriftGen::ScanSpec &ss) {
2597  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2598  TablePtr t = namespace_ptr->open_table(table);
2599  Hypertable::Future *future = get_future(ff);
2600 
2602  convert_scan_spec(ss, hss);
2603  return t->create_scanner_async(future, hss, 0);
2604  }
2605 
2606  TableScanner *_open_scanner(const ThriftGen::Namespace ns,
2607  const String &table, const Hypertable::ScanSpec &ss) {
2608  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2609  TablePtr t = namespace_ptr->open_table(table);
2610  return t->create_scanner(ss, 0);
2611  }
2612 
2613  TableMutator *_open_mutator(const ThriftGen::Namespace ns,
2614  const String &table) {
2615  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2616  TablePtr t = namespace_ptr->open_table(table);
2617  return t->create_mutator();
2618  }
2619 
2620  template <class CellT>
2621  void _next(vector<CellT> &result, TableScanner *scanner, int limit) {
2622  Hypertable::Cell cell;
2623  int32_t amount_read = 0;
2624 
2625  while (amount_read < limit) {
2626  if (scanner->next(cell)) {
2627  CellT tcell;
2628  amount_read += convert_cell(cell, tcell);
2629  result.push_back(tcell);
2630  }
2631  else
2632  break;
2633  }
2634  }
2635 
2636  template <class CellT>
2637  void _next_row(vector<CellT> &result, TableScanner *scanner) {
2638  Hypertable::Cell cell;
2639  std::string prev_row;
2640 
2641  while (scanner->next(cell)) {
2642  if (prev_row.empty() || prev_row == cell.row_key) {
2643  CellT tcell;
2644  convert_cell(cell, tcell);
2645  result.push_back(tcell);
2646  if (prev_row.empty())
2647  prev_row = cell.row_key;
2648  }
2649  else {
2650  scanner->unget(cell);
2651  break;
2652  }
2653  }
2654  }
2655 
2656  void run_hql_interp(const ThriftGen::Namespace ns, const String &hql,
2658  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2659  HqlInterpreterPtr interp(m_context.client->create_hql_interpreter(true));
2660  interp->set_namespace(namespace_ptr->get_name());
2661  interp->execute(hql, cb);
2662  }
2663 
2664  template <class CellT>
2665  void _offer_cells(const ThriftGen::Namespace ns, const String &table,
2666  const ThriftGen::MutateSpec &mutate_spec,
2667  const vector<CellT> &cells) {
2668  Hypertable::Cells hcells;
2669  convert_cells(cells, hcells);
2670  get_shared_mutator(ns, table, mutate_spec)->set_cells(hcells);
2671  }
2672 
2673  template <class CellT>
2674  void _offer_cell(const ThriftGen::Namespace ns, const String &table,
2675  const ThriftGen::MutateSpec &mutate_spec, const CellT &cell) {
2676  CellsBuilder cb;
2677  Hypertable::Cell hcell;
2678  convert_cell(cell, hcell);
2679  cb.add(hcell, false);
2680  get_shared_mutator(ns, table, mutate_spec)->set_cells(cb.get());
2681  }
2682 
2683  template <class CellT>
2684  void _set_cells(const Mutator mutator, const vector<CellT> &cells) {
2685  Hypertable::Cells hcells;
2686  convert_cells(cells, hcells);
2687  get_mutator(mutator)->set_cells(hcells);
2688  }
2689 
2690  template <class CellT>
2691  void _set_cell(const Mutator mutator, const CellT &cell) {
2692  CellsBuilder cb;
2693  Hypertable::Cell hcell;
2694  convert_cell(cell, hcell);
2695  cb.add(hcell, false);
2696  get_mutator(mutator)->set_cells(cb.get());
2697  }
2698 
2699  template <class CellT>
2700  void _set_cells_async(const MutatorAsync mutator, const vector<CellT> &cells) {
2701  Hypertable::Cells hcells;
2702  convert_cells(cells, hcells);
2703  TableMutatorAsync *mutator_ptr = get_mutator_async(mutator);
2704  mutator_ptr->set_cells(hcells);
2705  if (mutator_ptr->needs_flush())
2706  mutator_ptr->flush();
2707  }
2708 
2709  template <class CellT>
2710  void _set_cell_async(const MutatorAsync mutator, const CellT &cell) {
2711  CellsBuilder cb;
2712  Hypertable::Cell hcell;
2713  convert_cell(cell, hcell);
2714  cb.add(hcell, false);
2715  TableMutatorAsync *mutator_ptr = get_mutator_async(mutator);
2716  mutator_ptr->set_cells(cb.get());
2717  if (mutator_ptr->needs_flush())
2718  mutator_ptr->flush();
2719  }
2720 
2721  ClientObject *get_object(int64_t id) {
2722  std::lock_guard<std::mutex> lock(m_mutex);
2723  ObjectMap::iterator it = m_object_map.find(id);
2724  return (it != m_object_map.end()) ? it->second.get() : 0;
2725  }
2726 
2728  std::lock_guard<std::mutex> lock(m_mutex);
2729  ObjectMap::iterator it = m_cached_object_map.find(id);
2730  return (it != m_cached_object_map.end()) ? it->second.get() : 0;
2731  }
2732 
2734  Hypertable::Future *future = dynamic_cast<Hypertable::Future *>(get_object(id));
2735  if (future == 0) {
2736  HT_ERROR_OUT << "Bad future id - " << id << HT_END;
2738  format("Invalid future id: %lld", (Lld)id));
2739  }
2740  return future;
2741  }
2742 
2743 
2745  Hypertable::Namespace *ns = dynamic_cast<Hypertable::Namespace *>(get_cached_object(id));
2746  if (ns == 0) {
2747  HT_ERROR_OUT << "Bad namespace id - " << id << HT_END;
2749  format("Invalid namespace id: %lld", (Lld)id));
2750  }
2751  return ns;
2752  }
2753 
2755  int64_t id;
2756  std::lock_guard<std::mutex> lock(m_mutex);
2757  while (!m_cached_object_map.insert(make_pair(id = Random::number32(), co)).second || id == 0); // no overwrite
2758  return id;
2759  }
2760 
2762  std::lock_guard<std::mutex> lock(m_mutex);
2763  int64_t id = reinterpret_cast<int64_t>(co);
2764  m_object_map.insert(make_pair(id, ClientObjectPtr(co))); // no overwrite
2765  return id;
2766  }
2767 
2768  int64_t get_object_id(TableMutatorPtr &mutator) {
2769  std::lock_guard<std::mutex> lock(m_mutex);
2770  int64_t id = reinterpret_cast<int64_t>(mutator.get());
2771  m_object_map.insert(make_pair(id, static_pointer_cast<ClientObject>(mutator))); // no overwrite
2772  return id;
2773  }
2774 
2776  std::lock_guard<std::mutex> lock(m_mutex);
2777  int64_t id = reinterpret_cast<int64_t>(co);
2778  return m_object_map.find(id) != m_object_map.end() ? id : 0;
2779  }
2780 
2781  int64_t get_scanner_id(TableScanner *scanner, ScannerInfoPtr &info) {
2782  std::lock_guard<std::mutex> lock(m_mutex);
2783  int64_t id = reinterpret_cast<int64_t>(scanner);
2784  m_object_map.insert(make_pair(id, ClientObjectPtr(scanner)));
2785  m_scanner_info_map.insert(make_pair(id, info));
2786  return id;
2787  }
2788 
2789  int64_t get_scanner_id(TableScannerPtr &scanner, ScannerInfoPtr &info) {
2790  std::lock_guard<std::mutex> lock(m_mutex);
2791  int64_t id = reinterpret_cast<int64_t>(scanner.get());
2792  m_object_map.insert(make_pair(id, static_pointer_cast<ClientObject>(scanner)));
2793  m_scanner_info_map.insert(make_pair(id, info));
2794  return id;
2795  }
2796 
2797  void add_reference(int64_t from, int64_t to) {
2798  std::lock_guard<std::mutex> lock(m_mutex);
2799  ObjectMap::iterator it = m_object_map.find(to);
2800  if (it != m_object_map.end())
2801  m_reference_map.insert(make_pair(from, it->second));
2802  }
2803 
2804  void remove_references(int64_t id) {
2805  std::lock_guard<std::mutex> lock(m_mutex);
2806  m_reference_map.erase(id);
2807  }
2808 
2810  TableScannerAsync *scanner =
2811  dynamic_cast<TableScannerAsync *>(get_object(id));
2812  if (scanner == 0) {
2813  HT_ERROR_OUT << "Bad scanner id - " << id << HT_END;
2815  format("Invalid scanner id: %lld", (Lld)id));
2816  }
2817  return scanner;
2818  }
2819 
2820  TableScanner *get_scanner(int64_t id, ScannerInfoPtr &info) {
2821  std::lock_guard<std::mutex> lock(m_mutex);
2822  TableScanner *scanner {};
2823  auto it = m_object_map.find(id);
2824  if (it == m_object_map.end() ||
2825  (scanner = dynamic_cast<TableScanner *>(it->second.get())) == nullptr) {
2826  HT_ERROR_OUT << "Bad scanner id - " << id << HT_END;
2828  format("Invalid scanner id: %lld", (Lld)id));
2829  }
2830  auto sit = m_scanner_info_map.find(id);
2831  HT_ASSERT(sit != m_scanner_info_map.end());
2832  info = sit->second;
2833  return scanner;
2834  }
2835 
2836  bool remove_object(int64_t id) {
2837  // destroy client object unlocked
2838  bool removed = false;
2839  ClientObjectPtr item;
2840  {
2841  std::lock_guard<std::mutex> lock(m_mutex);
2842  ObjectMap::iterator it = m_object_map.find(id);
2843  if (it != m_object_map.end()) {
2844  item = (*it).second;
2845  m_object_map.erase(it);
2846  removed = true;
2847  }
2848  }
2849  return removed;
2850  }
2851 
2852  bool remove_cached_object(int64_t id) {
2853  // destroy client object unlocked
2854  bool removed = false;
2855  ClientObjectPtr item;
2856  {
2857  std::lock_guard<std::mutex> lock(m_mutex);
2858  ObjectMap::iterator it = m_cached_object_map.find(id);
2859  if (it != m_cached_object_map.end()) {
2860  item = (*it).second;
2861  m_cached_object_map.erase(it);
2862  removed = true;
2863  }
2864  }
2865  return removed;
2866  }
2867 
2868  void remove_scanner(int64_t id) {
2869  // destroy client object unlocked
2870  ClientObjectPtr item;
2871  {
2872  std::lock_guard<std::mutex> lock(m_mutex);
2873  m_scanner_info_map.erase(id);
2874  ObjectMap::iterator it = m_object_map.find(id);
2875  if (it != m_object_map.end()) {
2876  item = (*it).second;
2877  m_object_map.erase(it);
2878  }
2879  else {
2880  HT_ERROR_OUT << "Bad scanner id - " << id << HT_END;
2882  format("Invalid scanner id: %lld", (Lld)id));
2883  }
2884  }
2885  }
2886 
2887  void remove_scanner(int64_t id, ClientObjectPtr &scanner, ScannerInfoPtr &info) {
2888  std::lock_guard<std::mutex> lock(m_mutex);
2889  ObjectMap::iterator it = m_object_map.find(id);
2890  if (it != m_object_map.end()) {
2891  scanner = (*it).second;
2892  m_object_map.erase(it);
2893  }
2894  else {
2895  HT_ERROR_OUT << "Bad scanner id - " << id << HT_END;
2897  format("Invalid scanner id: %lld", (Lld)id));
2898  }
2899  info = m_scanner_info_map[id];
2900  m_scanner_info_map.erase(id);
2901  }
2902 
2903  void shared_mutator_refresh(const ThriftGen::Namespace ns,
2904  const String &table, const ThriftGen::MutateSpec &mutate_spec) override {
2905  std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
2906  SharedMutatorMapKey skey(get_namespace(ns), table, mutate_spec);
2907 
2908  SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
2909 
2910  // if mutator exists then delete it
2911  if (it != m_context.shared_mutator_map.end()) {
2912  LOG_API("deleting shared mutator on namespace=" << ns << " table="
2913  << table << " with appname=" << mutate_spec.appname);
2914  m_context.shared_mutator_map.erase(it);
2915  }
2916 
2917  //re-create the shared mutator
2918  // else create it and insert it in the map
2919  LOG_API("creating shared mutator on namespace=" << ns << " table=" << table
2920  <<" with appname=" << mutate_spec.appname);
2921  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2922  TablePtr t = namespace_ptr->open_table(table);
2923  TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
2924  mutate_spec.flush_interval);
2925  m_context.shared_mutator_map[skey] = mutator;
2926  return;
2927  }
2928 
2929  void refresh_shared_mutator(const ThriftGen::Namespace ns,
2930  const String &table, const ThriftGen::MutateSpec &mutate_spec) override {
2931  shared_mutator_refresh(ns, table, mutate_spec);
2932  }
2933 
2934  TableMutator *get_shared_mutator(const ThriftGen::Namespace ns,
2935  const String &table, const ThriftGen::MutateSpec &mutate_spec) {
2936  std::lock_guard<std::mutex> lock(m_context.shared_mutator_mutex);
2937  SharedMutatorMapKey skey(get_namespace(ns), table, mutate_spec);
2938 
2939  SharedMutatorMap::iterator it = m_context.shared_mutator_map.find(skey);
2940 
2941  // if mutator exists then return it
2942  if (it != m_context.shared_mutator_map.end())
2943  return it->second;
2944  else {
2945  // else create it and insert it in the map
2946  LOG_API("creating shared mutator on namespace=" << ns << " table="
2947  << table << " with appname=" << mutate_spec.appname);
2948  Hypertable::Namespace *namespace_ptr = get_namespace(ns);
2949  TablePtr t = namespace_ptr->open_table(table);
2950  TableMutator *mutator = t->create_mutator(0, mutate_spec.flags,
2951  mutate_spec.flush_interval);
2952  m_context.shared_mutator_map[skey] = mutator;
2953  return mutator;
2954  }
2955  }
2956 
2957  TableMutator *get_mutator(int64_t id) {
2958  TableMutator *mutator = dynamic_cast<TableMutator *>(get_object(id));
2959  if (mutator == 0) {
2960  HT_ERROR_OUT << "Bad mutator id - " << id << HT_END;
2962  format("Invalid mutator id: %lld", (Lld)id));
2963  }
2964  return mutator;
2965  }
2966 
2968  TableMutatorAsync *mutator =
2969  dynamic_cast<TableMutatorAsync *>(get_object(id));
2970  if (mutator == 0) {
2971  HT_ERROR_OUT << "Bad mutator id - " << id << HT_END;
2973  format("Invalid mutator id: %lld", (Lld)id));
2974  }
2975  return mutator;
2976  }
2977 
2978  void remove_future(int64_t id) {
2979  if (!remove_object(id)) {
2980  HT_ERROR_OUT << "Bad future id - " << id << HT_END;
2982  format("Invalid future id: %lld", (Lld)id));
2983  }
2984  }
2985 
2986  void remove_namespace(int64_t id) {
2987  if (!remove_cached_object(id)) {
2988  HT_ERROR_OUT << "Bad namespace id - " << id << HT_END;
2990  format("Invalid namespace id: %lld", (Lld)id));
2991  }
2992  }
2993 
2994  void remove_mutator(int64_t id) {
2995  if (!remove_object(id)) {
2996  HT_ERROR_OUT << "Bad mutator id - " << id << HT_END;
2998  format("Invalid mutator id: %lld", (Lld)id));
2999  }
3000  }
3001 
3002 private:
3006  multimap<::int64_t, ClientObjectPtr> m_reference_map;
3007  ObjectMap m_object_map;
3009  std::unordered_map< ::int64_t, ScannerInfoPtr> m_scanner_info_map;
3010 };
3011 
3012 template <class ResultT, class CellT>
3013 void HqlCallback<ResultT, CellT>::on_return(const std::string &ret) {
3014  result.results.push_back(ret);
3015  result.__isset.results = true;
3016 }
3017 
3018 template <class ResultT, class CellT>
3020  if (buffered) {
3021  Hypertable::Cell hcell;
3022  CellT tcell;
3023 
3024  while (s->next(hcell)) {
3025  convert_cell(hcell, tcell);
3026  result.cells.push_back(tcell);
3027  }
3028  result.__isset.cells = true;
3029 
3030  if (g_log_slow_queries)
3031  s->get_profile_data(profile_data);
3032 
3033  }
3034  else {
3035  ScannerInfoPtr si = make_shared<ScannerInfo>(ns);
3036  si->hql = hql;
3037  result.scanner = handler.get_scanner_id(s, si);
3038  result.__isset.scanner = true;
3039  }
3040  is_scan = true;
3041 }
3042 
3043 template <class ResultT, class CellT>
3045  if (flush) {
3046  Parent::on_finish(m);
3047  }
3048  else if (m) {
3049  result.mutator = handler.get_object_id(m);
3050  result.__isset.mutator = true;
3051  }
3052 }
3053 
3054 namespace {
3055  Context *g_context = 0;
3056 }
3057 
3059 public:
3060 
3061  static ServerHandler* getHandler(const String& remotePeer) {
3062  return instance.get_handler(remotePeer);
3063  }
3064 
3065  static void releaseHandler(ServerHandler* serverHandler) {
3066  try {
3067  instance.release_handler(serverHandler);
3068  }
3069  catch (Hypertable::Exception &e) {
3070  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what());
3071  }
3072  }
3073 
3074 private:
3075 
3077 
3078  ServerHandler* get_handler(const String& remotePeer) {
3079  std::lock_guard<std::mutex> lock(m_mutex);
3080  ServerHandlerMap::iterator it = m_server_handler_map.find(remotePeer);
3081  if (it != m_server_handler_map.end()) {
3082  ++it->second.first;
3083  return it->second.second;
3084  }
3085 
3086  ServerHandler* serverHandler = new ServerHandler(remotePeer, *g_context);
3087  m_server_handler_map.insert(
3088  std::make_pair(remotePeer,
3089  std::make_pair(1, serverHandler)));
3090 
3091  return serverHandler;
3092  }
3093 
3094  void release_handler(ServerHandler* serverHandler) {
3095  {
3096  std::lock_guard<std::mutex> lock(m_mutex);
3097  ServerHandlerMap::iterator it =
3098  m_server_handler_map.find(serverHandler->remote_peer());
3099  if (it != m_server_handler_map.end()) {
3100  if (--it->second.first > 0) {
3101  return;
3102  }
3103  }
3104  m_server_handler_map.erase(it);
3105  }
3106  delete serverHandler;
3107  }
3108 
3110  typedef std::map<String, std::pair<int, ServerHandler*> > ServerHandlerMap;
3111  ServerHandlerMap m_server_handler_map;
3113 };
3114 
3116 
3117 class ThriftBrokerIfFactory : public HqlServiceIfFactory {
3118 public:
3120 
3121  HqlServiceIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) override {
3122  typedef ::apache::thrift::transport::TSocket TTransport;
3123  String remotePeer =
3124  dynamic_cast<TTransport*>(connInfo.transport.get())->getPeerAddress();
3125  g_metrics_handler->connection_increment();
3126  return ServerHandlerFactory::getHandler(remotePeer);
3127  }
3128 
3129  void releaseHandler( ::Hypertable::ThriftGen::ClientServiceIf *service) override {
3130  ServerHandler* serverHandler = dynamic_cast<ServerHandler*>(service);
3131  g_metrics_handler->connection_decrement();
3132  return ServerHandlerFactory::releaseHandler(serverHandler);
3133  }
3134 };
3135 
3136 
3137 }} // namespace Hypertable::ThriftBroker
3138 
3139 
3140 int main(int argc, char **argv) {
3141  using namespace Hypertable;
3142  using namespace ThriftBroker;
3143  Random::seed(time(NULL));
3144 
3145  try {
3146  init_with_policies<Policies>(argc, argv);
3147 
3148  if (get_bool("ThriftBroker.Hyperspace.Session.Reconnect"))
3149  properties->set("Hyperspace.Session.Reconnect", true);
3150 
3151  if (get_bool("ThriftBroker.SlowQueryLog.Enable")) {
3152  g_log_slow_queries = true;
3153  g_slow_query_latency_threshold = get_i32("ThriftBroker.SlowQueryLog.LatencyThreshold");
3154  g_slow_query_log = new Cronolog("SlowQuery.log",
3155  System::install_dir + "/log",
3156  System::install_dir + "/log/archive");
3157  }
3158 
3159  g_metrics_handler = std::make_shared<MetricsHandler>(properties, g_slow_query_log);
3160  g_metrics_handler->start_collecting();
3161 
3162  boost::shared_ptr<ThriftBroker::Context> context(new ThriftBroker::Context());
3163 
3164  g_context = context.get();
3165 
3166  ::uint16_t port = get_i16("port");
3167  boost::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
3168  boost::shared_ptr<HqlServiceIfFactory> hql_service_factory(new ThriftBrokerIfFactory());
3169  boost::shared_ptr<TProcessorFactory> hql_service_processor_factory(new HqlServiceProcessorFactory(hql_service_factory));
3170 
3171  boost::shared_ptr<TServerTransport> serverTransport;
3172 
3173  if (has("thrift-timeout")) {
3174  int timeout_ms = get_i32("thrift-timeout");
3175  serverTransport.reset( new TServerSocket(port, timeout_ms, timeout_ms) );
3176  }
3177  else
3178  serverTransport.reset( new TServerSocket(port) );
3179 
3180  boost::shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
3181 
3182  TThreadedServer server(hql_service_processor_factory, serverTransport,
3183  transportFactory, protocolFactory);
3184 
3185  HT_INFO("Starting the server...");
3186 
3187  server.serve();
3188 
3189  g_metrics_handler->start_collecting();
3190  g_metrics_handler.reset();
3191 
3192  HT_INFO("Exiting.\n");
3193  }
3194  catch (Hypertable::Exception &e) {
3195  HT_ERROR_OUT << e << HT_END;
3196  }
3197  return 0;
3198 }
3199 
int64_t timestamp
Definition: KeySpec.h:130
void set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftCells &cells) override
bool get_counter() const
Gets the counter option.
void set_id(int32_t id)
Sets column ID.
bool is_set_max_versions() const
Checks if max versions option is set.
Retrieves system information (hardware, installation directory, etc)
bool get_time_order_desc() const
Gets time order desc option.
Meta::list< ThriftBrokerPolicy, DefaultCommPolicy > Policies
void shared_mutator_set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void next_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_serialized(Hypertable::ResultPtr &hresult, ThriftGen::ResultSerialized &tresult)
static std::mutex mutex
Definition: Logger.cc:43
void async_mutator_flush(const MutatorAsync mutator) override
void get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
bool is_set_ttl() const
Checks if ttl option is set.
const char * row_regexp
Definition: ScanSpec.h:279
void table_create(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
int32_t get_blocksize() const
Gets blocksize option.
void get_table_splits(const std::string &name, TableSplitsContainer &splits)
Returns a list of existing table names.
Definition: Namespace.cc:316
void set_row_offset(int32_t n)
Sets the number of rows to be skipped at the beginning of the query.
Definition: ScanSpec.h:363
void async_mutator_cancel(const MutatorAsync mutator) override
bool is_set_bloom_filter() const
Checks if bloom filter option is set.
std::shared_ptr< MetricsHandler > MetricsHandlerPtr
Smart pointer to MetricsHandler.
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
void create_namespace(const String &ns) override
TablePtr open_table(const std::string &name, int32_t flags=0)
Opens a table.
Definition: Namespace.cc:155
#define HT_WARNF(msg,...)
Definition: Logger.h:290
TableMutator * get_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec)
void set_generation(int64_t generation)
Sets generation.
TableScannerAsync * _open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss)
int64_t get_scanner_id(TableScannerPtr &scanner, ScannerInfoPtr &info)
int32_t scanblocks
Number of scan blocks returned from RangeServers.
void convert_schema(const Hypertable::SchemaPtr &hschema, ThriftGen::Schema &tschema)
void offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
Boost library.
Definition: Properties.cc:39
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
ServerHandler(const String &remote_peer, Context &c)
void set_bloom_filter(const std::string &bloomfilter)
Sets bloom filter option.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
void get_table_id(String &result, const ThriftGen::Namespace ns, const String &table) override
bool set_max_versions(int32_t max_versions)
Sets max versions option.
ColumnPredicates column_predicates
Definition: ScanSpec.h:277
void hql_exec_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
void _set_cell_async(const MutatorAsync mutator, const CellT &cell)
int32_t subscanners
Number of RangeServer::create_scanner() calls.
void set_deleted(bool value)
Sets deleted flag.
void cancel_mutator_async(const MutatorAsync mutator) override
void error_get_text(std::string &_return, int error_code) override
void mutator_set_cells_serialized(const Mutator mutator, const CellsSerialized &cells, const bool flush) override
int64_t bytes_scanned
Number of bytes scanned while executing scan.
bool is_cancelled()
Checks whether the Future object has been cancelled.
Definition: Future.h:97
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void scanner_get_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void mutator_close(const Mutator mutator) override
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
void scanner_get_row(ThriftCells &result, const Scanner scanner_id) override
void _convert_result_as_arrays(const Hypertable::ResultPtr &hresult, ThriftGen::ResultAsArrays &tresult)
Asynchronous table scanner.
void convert_table_split(const Hypertable::TableSplit &hsplit, ThriftGen::TableSplit &tsplit)
Represents a table split.
Definition: TableSplit.h:34
void shared_mutator_set_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCells &cells) override
void _next_row(vector< CellT > &result, TableScanner *scanner)
ThriftGen::Future future_open(int capacity) override
void convert_scan_spec(const ThriftGen::ScanSpec &tss, Hypertable::ScanSpec &hss)
void async_mutator_set_cells_as_arrays(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void log_slow_query_scanspec(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &table, Hypertable::ScanSpec &ss)
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
pair< int64_t, int64_t > time_interval
Definition: ScanSpec.h:278
const char * value_regexp
Definition: ScanSpec.h:280
const char * column_qualifier
Definition: KeySpec.h:128
void hql_query_as_arrays(HqlResultAsArrays &result, const ThriftGen::Namespace ns, const String &hql) override
void set_replication(int16_t replication)
Sets replication option.
#define THROW_TE(_code_, _str_)
Definition: ThriftBroker.cc:92
int compare(const SharedMutatorMapKey &skey) const
Declarations for fast_clock.
Column family specification.
void convert_key(const ThriftGen::Key &tkey, Hypertable::KeySpec &hkey)
void flush_mutator_async(const MutatorAsync mutator) override
void scanner_get_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void set_qualifier_index(bool value)
Sets qualifier index flag.
void set_name(const std::string &name)
Sets column family name.
const char * column_qualifier
Definition: Cell.h:68
bool set_ttl(time_t ttl)
Sets ttl option.
bool is_set_in_memory() const
Checks if in memory option is set.
void _set_cells_async(const MutatorAsync mutator, const vector< CellT > &cells)
void _convert_result(const Hypertable::ResultPtr &hresult, ThriftGen::Result &tresult)
int64_t revision
Definition: KeySpec.h:131
Specification for column family options.
void mutator_set_cells_as_arrays(const Mutator mutator, const ThriftCellsAsArrays &cells) override
#define HT_INFO(msg)
Definition: Logger.h:271
void namespace_create(const String &ns) override
Hypertable::Future * get_future(int64_t id)
STL namespace.
std::shared_ptr< Result > ResultPtr
Smart pointer to Result.
Definition: Result.h:72
void set_start_time(int64_t start)
Definition: ScanSpec.h:503
ScannerAsync open_scanner_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
void get_cells(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::string get_table_id(const std::string &name)
Returns the table identifier for a table.
Definition: Namespace.cc:217
void get_row_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers.
void async_mutator_close(const MutatorAsync mutator) override
void set_generation(int64_t generation)
Sets generation.
void on_scan(TableScannerPtr &) override
Called when interpreter is ready to scan.
Declarations for MetricsHandler.
void mutator_flush(const Mutator mutator) override
static uint32_t number32(uint32_t maximum=0)
Returns a random 32-bit unsigned integer.
Definition: Random.cc:55
std::shared_ptr< ClientObject > ClientObjectPtr
Smart pointer to ClientObject.
Definition: ClientObject.h:51
const char * hostname
Definition: TableSplit.h:49
ClientObject * get_cached_object(int64_t id)
void namespace_get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
bool future_is_full(ThriftGen::Future ff) override
const void * row
Definition: KeySpec.h:125
#define LOG_API_START(_expr_)
void next_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
static ServerHandler * getHandler(const String &remotePeer)
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
Definition: TableScanner.h:124
Specification for access group options.
bool convert_column_family_options(const Hypertable::ColumnFamilyOptions &hoptions, ThriftGen::ColumnFamilyOptions &toptions)
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
Definition: ScanSpec.h:401
bool is_set_replication() const
Checks if replication option is set.
void unget(const Cell &cell)
Ungets one cell.
Definition: TableScanner.cc:87
void _offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellT &cell)
void get_row_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const std::string &table, const std::string &row) override
String generate_guid()
Generates a new GUID.
Definition: Unique.h:44
void next_cells_serialized(CellsSerialized &result, const Scanner scanner_id) override
void get(Cells &cells)
Definition: Cells.h:97
void run_hql_interp(const ThriftGen::Namespace ns, const String &hql, HqlInterpreter::Callback &cb)
void set_cells_serialized_async(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
Represents a row interval.
Definition: RowInterval.h:38
const string render_hql(const string &table) const
Renders scan spec as an HQL SELECT statement.
Definition: ScanSpec.cc:166
bool is_set_counter() const
Checks if counter option is set.
#define LOG_API_FINISH_E(_expr_)
static void releaseHandler(ServerHandler *serverHandler)
Declarations for Cronolog.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void scanner_get_cells_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void async_mutator_set_cells_serialized(const MutatorAsync mutator, const CellsSerialized &cells, const bool flush) override
void create_cell_unique(std::string &_return, const ThriftGen::Namespace ns, const std::string &table_name, const ThriftGen::Key &tkey, const std::string &value) override
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
void set_cell_as_array_async(const MutatorAsync mutator, const CellAsArray &cell) override
bool future_is_empty(ThriftGen::Future ff) override
void set_value_index(bool value)
Sets value index flag.
void cancel_scanner_async(const ScannerAsync scanner) override
void close_namespace(const ThriftGen::Namespace ns) override
Scan predicate and control specification.
Definition: ScanSpec.h:56
void get_future_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void table_rename(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
void mutator_set_cell(const Mutator mutator, const ThriftGen::Cell &cell) override
MutatorAsync async_mutator_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
Definition: TableMutator.h:257
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Definition: TableMutator.h:55
void set_cell_limit(int32_t n)
Sets the maximum number of cells to return.
Definition: ScanSpec.h:349
multimap<::int64_t, ClientObjectPtr > m_reference_map
void create_cell_unique(const TablePtr &table, const KeySpec &key, String &guid)
Inserts a unique value into a table.
Definition: Unique.cc:33
bool future_has_outstanding(ThriftGen::Future ff) override
Mutator open_mutator(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void set_end_time(int64_t end)
Definition: ScanSpec.h:507
TableMutatorAsync * _open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags)
Declarations for ProfileDataScanner.
std::vector< ThriftGen::Cell > ThriftCells
void close_mutator_async(const MutatorAsync mutator) override
void offer_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftGen::Cell &cell) override
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
void async_scanner_close(const ScannerAsync scanner_async) override
void cancel_future(ThriftGen::Future ff) override
static time_point now() noexcept
Definition: fast_clock.cc:37
void merge_options(const ColumnFamilyOptions &other)
Merges options from another ColumnFamilyOptions object.
void cancel()
Cancels outstanding scanners/mutators.
Definition: Future.cc:166
void _set_cell(const Mutator mutator, const CellT &cell)
void mutator_set_cells(const Mutator mutator, const ThriftCells &cells) override
void get_listing(bool include_sub_entries, std::vector< NamespaceListing > &listing)
Returns a list of existing tables & namesspaces.
Definition: Namespace.cc:311
std::set< std::string > servers
Set of server proxy names participating in scan.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
Definition: ScanSpec.h:394
TableMutator * get_mutator(int64_t id)
void convert_cell(const ThriftGen::Cell &tcell, Hypertable::Cell &hcell)
void create_table(const std::string &name, const std::string &schema_str)
Creates a table.
Definition: Namespace.cc:111
void set_keys_only(bool val)
Return only keys (no values)
Definition: ScanSpec.h:514
int64_t cell_str_to_num(const std::string &from, const char *label, int64_t min_num=INT64_MIN, int64_t max_num=INT64_MAX)
const char * location
Definition: TableSplit.h:47
void async_mutator_set_cell(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
const char * ip_address
Definition: TableSplit.h:48
uint64_t revision
Definition: Cell.h:70
SchemaPtr get_schema(const std::string &name)
Returns a smart ptr to a schema object for a table.
Definition: Namespace.cc:238
int64_t get_cached_object_id(ClientObjectPtr co)
Scanner open_scanner(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
bool get_in_memory() const
Gets in memory option.
SharedMutatorMapKey(Hypertable::Namespace *ns, const String &tablename, const ThriftGen::MutateSpec &mutate_spec)
int64_t disk_read
Number of bytes read from disk while executing scan.
void table_get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
void get_cell(Value &result, const ThriftGen::Namespace ns, const String &table, const String &row, const String &column) override
void reserve_column_predicates(size_t s)
Definition: ScanSpec.h:433
Logging routines and macros.
void table_get_id(String &result, const ThriftGen::Namespace ns, const String &table) override
void merge_options(const AccessGroupOptions &options)
Merges options with those from another AccessGroupOptions object.
void table_get_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
void future_get_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
void release_handler(ServerHandler *serverHandler)
bool next(Cell &cell)
Gets the next cell.
Definition: TableScanner.cc:49
void next_cells(ThriftCells &result, const Scanner scanner_id) override
MutatorAsync open_mutator_async(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff,::int32_t flags) override
Compatibility Macros for C/C++.
Mutator mutator_open(const ThriftGen::Namespace ns, const String &table, int32_t flags, int32_t flush_interval) override
void future_get_result(ThriftGen::Result &tresult, ThriftGen::Future ff, int timeout_millis) override
void hql_exec(HqlResult &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
int64_t try_get_object_id(ClientObject *co)
std::unordered_map< ::int64_t, ClientObjectPtr > ObjectMap
bool get(ResultPtr &result)
This call blocks till there is a result available unless async ops have completed.
Definition: Future.cc:35
void get_cells_as_arrays(ThriftCellsAsArrays &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
void get_schema(ThriftGen::Schema &result, const ThriftGen::Namespace ns, const String &table) override
TableMutator * _open_mutator(const ThriftGen::Namespace ns, const String &table)
const std::string & get_access_group() const
Gets access group name.
bool operator<(const SharedMutatorMapKey &skey1, const SharedMutatorMapKey &skey2)
void set_return_deletes(bool val)
Internal use only.
Definition: ScanSpec.h:521
void shared_mutator_set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
bool future_is_cancelled(ThriftGen::Future ff) override
const char * row_key
Definition: Cell.h:66
Initialization helper for applications.
void get_table_splits(std::vector< ThriftGen::TableSplit > &_return, const ThriftGen::Namespace ns, const String &table) override
#define HT_END
Definition: Logger.h:220
Helper class for building a ScanSpec.
Definition: ScanSpec.h:318
bool set_time_order_desc(bool value)
Sets time order desc option.
bool is_empty()
Checks whether the Future result queue is empty.
Definition: Future.h:89
void set_cell_limit_per_family(int32_t n)
Sets the maximum number of cells to return per column family.
Definition: ScanSpec.h:356
void set_do_not_cache(bool val)
Don't cache.
Definition: ScanSpec.h:535
TableScanner * _open_scanner(const ThriftGen::Namespace ns, const String &table, const Hypertable::ScanSpec &ss)
#define LOG_SLOW_QUERY(_pd_, _ns_, _hql_)
Represents a column predicate (e.g.
HqlInterpreter::Callback Parent
int32_t get_max_versions() const
Gets max versions option.
Synchronous table scanner.
Definition: TableScanner.h:39
#define HT_ERROR_OUT
Definition: Logger.h:301
void future_cancel(ThriftGen::Future ff) override
bool exists_namespace(const String &ns) override
bool set_counter(bool value)
Sets counter option.
void refresh_shared_mutator(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
Time related declarations.
bool is_set_compressor() const
Checks if compressor option is set.
Time based rotating log.
Definition: Cronolog.h:59
void status(ThriftGen::Status &_return) override
void async_scanner_cancel(const ScannerAsync scanner) override
void set_cells_async(const MutatorAsync mutator, const ThriftCells &cells) override
bool is_set_time_order_desc() const
Checks if time_order_desc option is set.
Base class for Hypertable client objects.
Definition: ClientObject.h:44
time_t get_ttl() const
Gets ttl option.
ScannerAsync async_scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Future ff, const ThriftGen::ScanSpec &ss) override
Access group specification.
bool table_exists(const ThriftGen::Namespace ns, const String &table) override
int32_t convert_cells(const Hypertable::Cells &hcells, ThriftCells &tcells)
int16_t get_replication() const
Gets replication option.
void remove_scanner(int64_t id, ClientObjectPtr &scanner, ScannerInfoPtr &info)
ScannerInfo(int64_t ns, const string &t)
void mutator_set_cell_as_array(const Mutator mutator, const CellAsArray &cell) override
void hql_query2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql) override
Hypertable definitions
void async_mutator_set_cell_as_array(const MutatorAsync mutator, const CellAsArray &cell) override
std::shared_ptr< HqlInterpreter > HqlInterpreterPtr
Smart pointer to HqlInterpreter.
void refresh_table(const ThriftGen::Namespace ns, const String &table_name) override
const std::string & get_name() const
Gets column family name.
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
bool table_exists(ContextPtr &context, const String &name, String &id)
Checks if table exists and returns table ID.
Definition: Utility.cc:100
void merge_defaults(const ColumnFamilyOptions &options)
Merges column family defaults with those from another AccessGroupOptions object.
Hypertable::Namespace * get_namespace(int64_t id)
Callback interface/base class for execute.
TableScannerAsync * get_scanner_async(int64_t id)
void add_column(const string &str)
Adds a column family to be returned by the scan.
Definition: ScanSpec.h:408
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
Definition: ScanSpec.h:455
bool convert_access_group_options(const Hypertable::AccessGroupOptions &hoptions, ThriftGen::AccessGroupOptions &toptions)
HqlServiceIf * getHandler(const ::apache::thrift::TConnectionInfo &connInfo) override
TableScanner * get_scanner(int64_t id, ScannerInfoPtr &info)
int64_t get_object_id(ClientObject *co)
ThriftGen::Namespace open_namespace(const String &ns) override
void on_return(const std::string &) override
Called when interpreter returns a string result Maybe called multiple times for a list of string resu...
void drop_namespace(const String &ns, const bool if_exists) override
void shared_mutator_set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
void alter_table(const std::string &table_name, SchemaPtr &schema, bool force)
Alter table schema.
Definition: Namespace.cc:135
#define LOG_API(_expr_)
bool exists_table(const std::string &name)
Checks if the table exists.
Definition: Namespace.cc:189
void log_slow_query(const char *func_name, std::chrono::fast_clock::time_point start_time, std::chrono::fast_clock::time_point end_time, int64_t latency_ms, ProfileDataScanner &profile_data, Hypertable::Namespace *ns, const string &hql)
void drop_table(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_access_group(const std::string &ag)
Sets access group.
const char * column_family
Definition: Cell.h:67
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void set_blocksize(int32_t blocksize)
Sets blocksize option.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
void close_scanner(const Scanner scanner) override
void table_alter(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Schema &schema) override
Random number generator for int32, int64, double and ascii arrays.
void close_mutator(const Mutator mutator) override
void get_profile_data(ProfileDataScanner &profile_data)
Gets profile data.
Definition: TableScanner.h:91
void offer_cell_as_array(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const CellAsArray &cell) override
void table_get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
bool exists_table(const ThriftGen::Namespace ns, const String &table) override
#define LOG_SLOW_QUERY_SCANNER(_scanner_, _ns_, _table_, _ss_)
RowIntervals row_intervals
Definition: ScanSpec.h:275
bool namespace_exists(const String &ns) override
std::string get_schema_str(const std::string &name, bool with_ids=false)
Returns the schema for a table.
Definition: Namespace.cc:228
bool has_outstanding()
Checks whether there are any outstanding operations.
Definition: Future.h:105
std::map< SharedMutatorMapKey, TableMutator * > SharedMutatorMap
void table_drop(const ThriftGen::Namespace ns, const String &table, const bool if_exists) override
void set_cell_as_array(const ThriftGen::Namespace ns, const String &table, const CellAsArray &cell) override
static String install_dir
The installation directory.
Definition: System.h:114
This is a generic exception class for Hypertable.
Definition: Error.h:314
void _set_cells(const Mutator mutator, const vector< CellT > &cells)
void scanner_get_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
int main(int argc, char **argv)
void rename_table(const std::string &old_name, const std::string &new_name)
Renames a table.
Definition: Namespace.cc:248
void flush_mutator(const Mutator mutator) override
void set_cells_serialized(const ThriftGen::Namespace ns, const String &table, const CellsSerialized &cells) override
void add_reference(int64_t from, int64_t to)
int64_t get_object_id(TableMutatorPtr &mutator)
void set_in_memory(bool value)
Sets in memory option.
ServerHandler * get_handler(const String &remotePeer)
void future_close(const ThriftGen::Future ff) override
void get_tables(std::vector< String > &tables, const ThriftGen::Namespace ns) override
bool is_set_blocksize() const
Checks if blocksize option is set.
uint32_t value_len
Definition: Cell.h:72
void set_cells_as_arrays_async(const MutatorAsync mutator, const ThriftCellsAsArrays &cells) override
void get_listing(std::vector< ThriftGen::NamespaceListing > &_return, const ThriftGen::Namespace ns) override
TableMutatorAsync * get_mutator_async(int64_t id)
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void hql_query(HqlResult &result, const ThriftGen::Namespace ns, const String &hql) override
void generate_guid(std::string &_return) override
void get_future_result_serialized(ThriftGen::ResultSerialized &tresult, ThriftGen::Future ff, int timeout_millis) override
const std::string & get_name() const
Definition: Namespace.h:78
void namespace_drop(const String &ns, const bool if_exists) override
void set_and_column_predicates(bool val)
AND together the column predicates.
Definition: ScanSpec.h:548
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
const char * start_row
Definition: TableSplit.h:45
void releaseHandler(::Hypertable::ThriftGen::ClientServiceIf *service) override
Represents a cell interval.
Definition: CellInterval.h:38
void add_cell_interval(const string &start_row, const string &start_column, bool start_inclusive, const string &end_row, const string &end_column, bool end_inclusive)
Adds a cell interval to be returned in the scan.
Definition: ScanSpec.h:485
void scanner_get_cells(ThriftCells &result, const Scanner scanner_id) override
void scanner_close(const Scanner id) override
Scanner scanner_open(const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
uint8_t flag
Definition: Cell.h:73
void set_cell_async(const MutatorAsync mutator, const ThriftGen::Cell &cell) override
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan.
Definition: ScanSpec.h:426
ThriftGen::Future open_future(int capacity) override
void _offer_cells(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const vector< CellT > &cells)
void set_compressor(const std::string &compressor)
Sets compressor option.
CellIntervals cell_intervals
Definition: ScanSpec.h:276
void get_schema_str_with_ids(String &result, const ThriftGen::Namespace ns, const String &table) override
void async_mutator_set_cells(const MutatorAsync mutator, const ThriftCells &cells) override
void add(const Cell &cell, bool own=true)
Definition: Cells.h:69
HqlCallback(ResultT &r, ServerHandler *handler, const ThriftGen::Namespace ns, const String &hql, bool flush, bool buffered)
std::vector< CellAsArray > ThriftCellsAsArrays
void set_row_limit(int32_t n)
Sets the maximum number of rows to return in the scan.
Definition: ScanSpec.h:342
void hql_exec2(HqlResult2 &result, const ThriftGen::Namespace ns, const String &hql, bool noflush, bool unbuffered) override
#define LOG_API_FINISH
Encapsulates decomposed key and value.
Definition: Cell.h:32
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
Definition: ScanSpec.h:387
void reserve_columns(size_t s)
Definition: ScanSpec.h:412
static time_t to_time_t(const time_point &__t) noexcept
Definition: fast_clock.cc:43
void shared_mutator_refresh(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec) override
ThriftGen::Namespace namespace_open(const String &ns) override
void drop_table(const std::string &name, bool if_exists)
Removes a table.
Definition: Namespace.cc:269
std::shared_ptr< ScannerInfo > ScannerInfoPtr
const char * column_family
Definition: KeySpec.h:127
const char * end_row
Definition: TableSplit.h:46
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
void future_get_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
Definition: ScanSpec.h:528
bool is_full()
Checks whether the Future result queue is full.
Definition: Future.h:81
std::unordered_map< ::int64_t, ScannerInfoPtr > m_scanner_info_map
void close_future(const ThriftGen::Future ff) override
void next_row_serialized(CellsSerialized &result, const Scanner scanner_id) override
void close_scanner_async(const ScannerAsync scanner_async) override
void table_get_schema_str(String &result, const ThriftGen::Namespace ns, const String &table) override
void get_row(ThriftCells &result, const ThriftGen::Namespace ns, const String &table, const String &row) override
const char * error_get_text(int error_code)
Retrieves a descriptive error string of an error code.
Definition: Error.h:38
static void seed(unsigned int s)
Sets the seed of the random number generator.
Definition: Random.cc:50
std::map< String, std::pair< int, ServerHandler * > > ServerHandlerMap
int64_t bytes_returned
Number of bytes returned while executing scan.
void rename_table(const ThriftGen::Namespace ns, const String &table, const String &new_table_name) override
#define RETHROW(_expr_)
const std::string & get_compressor() const
Gets compressor option.
void set_cell_offset(int32_t n)
Sets the number of cells to be skipped at the beginning of the query.
Definition: ScanSpec.h:375
void get_cells_serialized(CellsSerialized &result, const ThriftGen::Namespace ns, const String &table, const ThriftGen::ScanSpec &ss) override
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
void next_row_as_arrays(ThriftCellsAsArrays &result, const Scanner scanner_id) override
void set_cell(const ThriftGen::Namespace ns, const String &table, const ThriftGen::Cell &cell) override
void offer_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftGen::MutateSpec &mutate_spec, const ThriftCellsAsArrays &cells) override
int64_t get_scanner_id(TableScanner *scanner, ScannerInfoPtr &info)
void get_future_result_as_arrays(ThriftGen::ResultAsArrays &tresult, ThriftGen::Future ff, int timeout_millis) override
const uint8_t * value
Definition: Cell.h:71
ClientObject * get_object(int64_t id)
int code() const
Returns the error code.
Definition: Error.h:391
void _next(vector< CellT > &result, TableScanner *scanner, int limit)
void refresh_table(const std::string &name)
Refreshes the cached table entry.
Definition: Namespace.cc:185
void namespace_close(const ThriftGen::Namespace ns) override
const std::string & get_bloom_filter() const
Gets bloom filter option.
int64_t timestamp
Definition: Cell.h:69
void set_cells_as_arrays(const ThriftGen::Namespace ns, const String &table, const ThriftCellsAsArrays &cells) override