0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableScannerAsync.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 #include "TableScannerAsync.h"
24 
25 #include <Hypertable/Lib/Table.h>
28 
29 #include <Common/Error.h>
30 #include <Common/Regex.h>
31 #include <Common/String.h>
32 
33 #include <chrono>
34 #include <cctype>
35 
36 using namespace Hypertable;
37 using namespace std;
38 
43  ApplicationQueueInterfacePtr &app_queue, Table *table,
44  RangeLocatorPtr &range_locator, const ScanSpec &scan_spec,
45  uint32_t timeout_ms, ResultCallback *cb, int flags)
46  : m_bytes_scanned(0), m_current_scanner(0), m_outstanding(0),
47  m_error(Error::OK), m_cancelled(false), m_use_index(false)
48 {
49  unique_lock<mutex> lock(m_mutex);
50  ScanSpecBuilder primary_spec(scan_spec);
51  ScanSpecBuilder index_spec;
52  const ScanSpec *first_pass_spec;
53  bool use_qualifier = false;
54  bool row_intervals_applied = false;
55  std::vector<CellPredicate> cell_predicates;
56 
57  HT_ASSERT(timeout_ms);
58 
59  // can we optimize this query with an index?
61  && use_index(table, scan_spec, index_spec,
62  cell_predicates,
63  &use_qualifier,
64  &row_intervals_applied)) {
65 
66  first_pass_spec = &index_spec.get();
67 
68  m_use_index = true;
69 
70  // create a ResultCallback object which will load the keys from
71  // the index, then sort and verify them
72  cb = new IndexScannerCallback(this, table, primary_spec.get(), cell_predicates,
73  cb, timeout_ms, use_qualifier, row_intervals_applied);
74 
75  // get the index table
76  if (use_qualifier)
77  table = table->get_qualifier_index_table().get();
78  else
79  table = table->get_index_table().get();
80 
81  // fall through and retrieve values from the primary table. the
82  // IndexScannerCallback object will then transform the results and forward
83  // them to the original callback
84  }
85  else {
86  transform_primary_scan_spec(primary_spec);
87  first_pass_spec = &primary_spec.get();
88  }
89 
90  m_cb = cb;
91  m_table = table;
92 
93  init(comm, app_queue, table, range_locator, *first_pass_spec, timeout_ms, cb);
94 }
95 
96 
97 bool TableScannerAsync::use_index(Table *table, const ScanSpec &primary_spec,
98  ScanSpecBuilder &index_spec,
99  std::vector<CellPredicate> &cell_predicates,
100  bool *use_qualifier,
101  bool *row_intervals_applied)
102 {
103  if (!table->has_index_table() && !table->has_qualifier_index_table())
104  return false;
105 
106  index_spec.set_keys_only(true);
107  index_spec.set_start_time(primary_spec.time_interval.first);
108  index_spec.set_end_time(primary_spec.time_interval.second);
109  index_spec.add_column("v1");
110 
111  cell_predicates.resize(256);
112 
113  string family;
114  DynamicBuffer regex_prefix_buf;
115  const char *prefix;
116  size_t prefix_len;
117  ColumnFamilySpec *cf_spec = 0;
118  string index_row_prefix;
119  DynamicBuffer index_row_buf;
120  LoadDataEscape lde;
121  bool qualifier_match_only = false;
122  bool value_match = false;
123  size_t qualifier_index_count = 0;
124  size_t value_index_count = 0;
125  size_t row_intervals_applied_count = 0;
126 
127  // for value prefix queries we require normal indicies for ALL scanned columns
128  if (!primary_spec.column_predicates.empty()) {
129  size_t id = 0;
130 
131  for (const auto &cp : primary_spec.column_predicates) {
132 
133  if ((cf_spec = table->schema()->get_column_family(cp.column_family)) == 0)
134  return false;
135 
136  if (cf_spec->get_value_index())
137  value_index_count++;
138  if (cf_spec->get_qualifier_index())
139  qualifier_index_count++;
140 
141  // Make sure that all prediates match against the value index, or all
142  // predicates match against the qualifier index
143  if (cp.operation & ColumnPredicate::VALUE_MATCH) {
144  value_match = true;
145  if (qualifier_match_only)
146  return false;
147  }
148  else if (cp.operation & ColumnPredicate::QUALIFIER_MATCH) {
149  qualifier_match_only = true;
150  if (value_match)
151  return false;
152  }
153 
154  UInt8Formatter cfid(cf_spec->get_id());
155 
156  if (cp.operation & ColumnPredicate::REGEX_MATCH) {
157  if (!Regex::extract_prefix((const char *)cp.value, (size_t)cp.value_len,
158  &prefix, &prefix_len, regex_prefix_buf))
159  return false;
160  const char *escaped_prefix;
161  size_t escaped_prefix_len;
162  lde.escape(prefix, prefix_len,
163  &escaped_prefix, &escaped_prefix_len);
164  if (index_row_buf.size < escaped_prefix_len+5)
165  index_row_buf.grow(escaped_prefix_len+5);
166  index_row_buf.ptr = (uint8_t*)cfid.append_to((char*)index_row_buf.base);
167  *index_row_buf.ptr++ = ',';
168  index_row_buf.add_unchecked(escaped_prefix, escaped_prefix_len);
169  *index_row_buf.ptr = 0;
170  HT_ASSERT(index_row_buf.fill() < index_row_buf.size);
171  add_index_row(index_spec, (const char *)index_row_buf.base);
172  cell_predicates[cf_spec->get_id()].add_column_predicate(cp, id++);
173  }
174  else if (cp.operation & (ColumnPredicate::EXACT_MATCH|
176 
177  // every \t in the original value gets escaped
178  const char *value;
179  size_t value_len;
180  lde.escape(cp.value, cp.value_len, &value, &value_len);
181 
182  const char *escaped_qualifier;
183  size_t escaped_qualifier_len = 0;
184  if ((cp.operation & ColumnPredicate::EXACT_MATCH) &&
187 
188  lde.escape(cp.column_qualifier, cp.column_qualifier_len,
189  &escaped_qualifier, &escaped_qualifier_len);
190  }
191 
192  // exact match: create row interval ["%d,value\t", ..)
193  // or create row interval ["%d,value\tqualifier", ..)
194  // or create row interval ["%d,value\tqualifier\t", ..)
195  // or create row interval ["%d,value\tqualifier\trow", ..)
196  // prefix match: create row interval ["%d,value", ..)
197  if (index_row_buf.size < value_len+escaped_qualifier_len+6)
198  index_row_buf.grow(value_len+escaped_qualifier_len+6);
199  // %d,
200  index_row_buf.ptr = (uint8_t*)cfid.append_to((char*)index_row_buf.base);
201  *index_row_buf.ptr++ = ',';
202  // value
203  bool has_row_interval = false;
204  index_row_buf.add_unchecked(value, value_len);
205  if (cp.operation & ColumnPredicate::EXACT_MATCH) {
206  *index_row_buf.ptr++ = '\t';
207  // qualifier
208  if (cp.operation & (ColumnPredicate::QUALIFIER_EXACT_MATCH|
210 
211  index_row_buf.add_unchecked(escaped_qualifier, escaped_qualifier_len);
212  if (cp.operation & ColumnPredicate::QUALIFIER_EXACT_MATCH) {
213  *index_row_buf.ptr++ = '\t';
214  *index_row_buf.ptr = 0;
215  // primary row intervals
216  if (primary_spec.row_intervals.size()) {
217  has_row_interval = true;
218  ++row_intervals_applied_count;
219  index_row_prefix = (const char *)index_row_buf.base;
220  for (const auto &primary_ri : primary_spec.row_intervals) {
221  index_spec.add_row_interval(
222  index_row_prefix + primary_ri.start, primary_ri.start_inclusive,
223  index_row_prefix + primary_ri.end, primary_ri.end_inclusive);
224  }
225  }
226  // primary cell intervals
227  else if (primary_spec.cell_intervals.size()) {
228  has_row_interval = true;
229  index_row_prefix = (const char *)index_row_buf.base;
230  for (const auto &primary_ci : primary_spec.cell_intervals) {
231  index_spec.add_row_interval(
232  index_row_prefix + primary_ci.start_row, true,
233  index_row_prefix + primary_ci.end_row, true);
234  }
235  }
236  }
237  else
238  *index_row_buf.ptr = 0;
239  }
240  else if (cp.operation & ColumnPredicate::QUALIFIER_REGEX_MATCH) {
241  if (Regex::extract_prefix(cp.column_qualifier, cp.column_qualifier_len,
242  &prefix, &prefix_len, regex_prefix_buf)) {
243  const char *escaped_prefix;
244  size_t escaped_prefix_len;
245  lde.escape(prefix, prefix_len,
246  &escaped_prefix, &escaped_prefix_len);
247  index_row_buf.add(escaped_prefix, escaped_prefix_len);
248  }
249  *index_row_buf.ptr = 0;
250  }
251  else
252  *index_row_buf.ptr = 0;
253  }
254  if (!has_row_interval) {
255  *index_row_buf.ptr = 0;
256  HT_ASSERT(index_row_buf.fill() < index_row_buf.size);
257  add_index_row(index_spec, (const char *)index_row_buf.base);
258  }
259  cell_predicates[cf_spec->get_id()].add_column_predicate(cp, id++);
260  }
261  else if (cp.operation & ColumnPredicate::QUALIFIER_REGEX_MATCH) {
262  if (!Regex::extract_prefix(cp.column_qualifier, cp.column_qualifier_len,
263  &prefix, &prefix_len, regex_prefix_buf))
264  return false;
265  const char *escaped_prefix;
266  size_t escaped_prefix_len;
267  lde.escape(prefix, prefix_len,
268  &escaped_prefix, &escaped_prefix_len);
269  index_row_prefix.clear();
270  index_row_prefix.reserve(5+escaped_prefix_len);
271  index_row_prefix = cfid.c_str();
272  index_row_prefix.append(",", 1);
273  index_row_prefix.append(escaped_prefix, escaped_prefix_len);
274  add_index_row(index_spec, index_row_prefix.c_str());
275  cell_predicates[cf_spec->get_id()].add_column_predicate(cp, id++);
276  }
277  else if (cp.operation & (ColumnPredicate::QUALIFIER_EXACT_MATCH|
279 
280  const char *escaped_qualifier;
281  size_t escaped_qualifier_len;
282  lde.escape(cp.column_qualifier, cp.column_qualifier_len,
283  &escaped_qualifier, &escaped_qualifier_len);
284  index_row_prefix.clear();
285  index_row_prefix.reserve(6+escaped_qualifier_len);
286  index_row_prefix = cfid.c_str();
287  index_row_prefix.append(",", 1);
288  index_row_prefix.append(escaped_qualifier, escaped_qualifier_len);
289  if (cp.operation & ColumnPredicate::QUALIFIER_EXACT_MATCH) {
290  index_row_prefix.append("\t", 1);
291  // primary row intervals
292  if (primary_spec.row_intervals.size()) {
293  ++row_intervals_applied_count;
294  for (const auto &primary_ri : primary_spec.row_intervals) {
295  index_spec.add_row_interval(
296  index_row_prefix + primary_ri.start, primary_ri.start_inclusive,
297  index_row_prefix + primary_ri.end, primary_ri.end_inclusive);
298  }
299  }
300  // primary cell intervals
301  else if (primary_spec.cell_intervals.size()) {
302  for (const auto &primary_ci : primary_spec.cell_intervals) {
303  index_spec.add_row_interval(
304  index_row_prefix + primary_ci.start_row, true,
305  index_row_prefix + primary_ci.end_row, true);
306  }
307  }
308  else
309  add_index_row(index_spec, index_row_prefix.c_str());
310  }
311  else
312  add_index_row(index_spec, index_row_prefix.c_str());
313  cell_predicates[cf_spec->get_id()].add_column_predicate(cp, id++);
314  }
315  else
316  return false;
317  }
318 
319  if (row_intervals_applied)
320  *row_intervals_applied = row_intervals_applied_count == primary_spec.column_predicates.size();
321 
322  if (qualifier_match_only) {
323  if (qualifier_index_count < primary_spec.column_predicates.size())
324  return false;
325  *use_qualifier = qualifier_match_only;
326  }
327  else if (value_index_count < primary_spec.column_predicates.size())
328  return false;
329 
330  return true;
331  }
332 
333  return false;
334 }
335 
337 
338  if (m_use_index)
339  return;
340 
341  if (!primary_spec.get().column_predicates.empty()) {
342 
343  // Load predicate_columns set
344  CstrSet predicate_columns;
345  for (const auto &predicate : primary_spec.get().column_predicates)
346  predicate_columns.insert(predicate.column_family);
347 
348  // If selected columns empty, add all columns referenced in predicates
349  if (primary_spec.get().columns.empty()) {
350  for (auto column : predicate_columns)
351  primary_spec.add_column(column);
352  }
353  else {
354  string family;
355  const char *colon;
356  StringSet selected_columns;
357  // If columns selected that are not referenced in predicate, throw error
358  for (auto column : primary_spec.get().columns) {
359  family.clear();
360  if ((colon = strchr(column, ':')) != 0)
361  family.append(column, colon-column);
362  else
363  family.append(column);
364  if (predicate_columns.count(family.c_str()) == 0)
366  "Selected column %s must be referenced in Column predicate",
367  column);
368  selected_columns.insert(family);
369  }
370 
371  // Add predicate columns that are missing from selection set
372  for (const auto &predicate : primary_spec.get().column_predicates)
373  if (selected_columns.count(predicate.column_family) == 0)
374  primary_spec.add_column(predicate.column_family);
375  }
376  }
377 
378 }
379 
381 {
382  // this code is identical to the RELOP_SW handler in HqlParser.h
383  string tmp;
384  RowInterval ri;
385  ri.start = row;
386  ri.start_inclusive = true;
387  const char *str = row;
388  const char *end = str + strlen(row);
389  const char *ptr;
390  for (ptr = end - 1; ptr > str; --ptr) {
391  if (::uint8_t(*ptr) < 0xffu) {
392  tmp = String(str, ptr - str);
393  tmp.append(1, (*ptr)+1);
394  ri.end = tmp.c_str();
395  ri.end_inclusive = false;
396  break;
397  }
398  }
399  if (ptr == str) {
400  tmp = row;
401  tmp.append(4, (char)0xff);
402  ri.end = tmp.c_str();
403  ri.end_inclusive = false;
404  }
406  ri.end, ri.end_inclusive);
407 }
408 
410  Table *table, RangeLocatorPtr &range_locator,
411  const ScanSpec &scan_spec, uint32_t timeout_ms, ResultCallback *cb)
412 {
413  int scanner_id = 0;
414  IntervalScannerAsyncPtr ri_scanner;
415  ScanSpec interval_scan_spec;
416  Timer timer(timeout_ms);
417  bool current_set = false;
418 
420  m_cb->register_scanner(this);
421 
422  try {
423  if (scan_spec.row_intervals.empty()) {
424  if (scan_spec.cell_intervals.empty()) {
425  ri_scanner =
426  make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
427  scan_spec, timeout_ms, !current_set,
428  this, scanner_id++);
429 
430  current_set = true;
431  m_interval_scanners.push_back(ri_scanner);
432  m_outstanding++;
433  }
434  else {
435  m_interval_scanners.reserve(scan_spec.cell_intervals.size());
436  for (size_t i=0; i<scan_spec.cell_intervals.size(); i++) {
437  scan_spec.base_copy(interval_scan_spec);
438  interval_scan_spec.cell_intervals.push_back(
439  scan_spec.cell_intervals[i]);
440  ri_scanner =
441  make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
442  interval_scan_spec, timeout_ms,
443  !current_set, this, scanner_id++);
444  current_set = true;
445  m_interval_scanners.push_back(ri_scanner);
446  m_outstanding++;
447  }
448  }
449  }
450  else if (scan_spec.scan_and_filter_rows) {
451  ScanSpec rowset_scan_spec;
452  scan_spec.base_copy(rowset_scan_spec);
453  rowset_scan_spec.row_intervals.reserve(scan_spec.row_intervals.size());
454  for (const auto &ri : scan_spec.row_intervals) {
455  if (ri.start != ri.end && strcmp(ri.start, ri.end) != 0) {
456  scan_spec.base_copy(interval_scan_spec);
457  interval_scan_spec.scan_and_filter_rows = false;
458  interval_scan_spec.row_intervals.push_back(ri);
459  ri_scanner =
460  make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
461  interval_scan_spec, timeout_ms,
462  !current_set, this, scanner_id++);
463  current_set = true;
464  m_interval_scanners.push_back(ri_scanner);
465  m_outstanding++;
466  }
467  else
468  rowset_scan_spec.row_intervals.push_back(ri);
469  }
470  if (rowset_scan_spec.row_intervals.size()) {
471  ri_scanner =
472  make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
473  rowset_scan_spec, timeout_ms,
474  !current_set, this, scanner_id++);
475  current_set = true;
476  m_interval_scanners.push_back(ri_scanner);
477  m_outstanding++;
478  }
479  }
480  else {
481  m_interval_scanners.reserve(scan_spec.row_intervals.size());
482  for (size_t i=0; i<scan_spec.row_intervals.size(); i++) {
483  scan_spec.base_copy(interval_scan_spec);
484  interval_scan_spec.row_intervals.push_back(scan_spec.row_intervals[i]);
485  ri_scanner =
486  make_shared<IntervalScannerAsync>(comm, app_queue, table, range_locator,
487  interval_scan_spec, timeout_ms,
488  !current_set, this, scanner_id++);
489  current_set = true;
490  m_interval_scanners.push_back(ri_scanner);
491  m_outstanding++;
492  }
493  }
494  }
495  catch (Exception &e) {
496  m_error = e.code();
497  m_error_msg = e.what();
498  if (ri_scanner && ri_scanner->has_outstanding_requests()) {
499  m_interval_scanners.push_back(ri_scanner);
500  m_outstanding++;
501  }
502  if (m_outstanding == 0)
503  maybe_callback_error(0, false);
504  }
505 }
506 
508  try {
509  cancel();
511  }
512  catch (Exception &e) {
513  HT_ERROR_OUT << e << HT_END;
514  }
515  if (m_use_index) {
516  ((IndexScannerCallback *)m_cb)->shutdown();
517  delete m_cb;
518  m_cb = 0;
519  }
520 }
521 
523  unique_lock<mutex> lock(m_cancel_mutex);
524  m_cancelled = true;
525 }
526 
528  unique_lock<mutex> lock(m_cancel_mutex);
529  return m_cancelled;
530 }
531 
532 void TableScannerAsync::handle_error(int scanner_id, int error, const string &error_msg,
533  bool is_create) {
534  bool cancelled = is_cancelled();
535  unique_lock<mutex> lock(m_mutex);
536  bool abort = false;
537  bool next = false;
538 
539  // if we've already seen an error or the scanner has been cacncelled
540  if (m_error != Error::OK || cancelled) {
541  abort=true;
542  next = m_interval_scanners[scanner_id]->abort(is_create);
543  }
544  else {
545  switch(error) {
546  case (Error::TABLE_NOT_FOUND):
547  if (m_table->auto_refresh() && is_create)
548  abort = !(m_interval_scanners[scanner_id]->retry_or_abort(true, true,
549  is_create, &next, error));
550  else {
551  next = m_interval_scanners[scanner_id]->abort(is_create);
552  abort = true;
553  }
554  break;
556  if (m_table->auto_refresh() && is_create)
557  abort = !(m_interval_scanners[scanner_id]->retry_or_abort(true, false,
558  is_create, &next, error));
559  else {
560  next = m_interval_scanners[scanner_id]->abort(is_create);
561  abort = true;
562  }
563  break;
565  abort = !(m_interval_scanners[scanner_id]->is_destroyed_scanner(is_create));
566  next = !m_interval_scanners[scanner_id]->has_outstanding_requests();
567  break;
571  abort = !(m_interval_scanners[scanner_id]->retry_or_abort(false, true,
572  is_create, &next, error));
573  break;
574 
575  default:
576  Exception e(error, error_msg);
577  HT_ERROR_OUT << "Received error: is_create=" << is_create << " - "<< e << HT_END;
578  next = m_interval_scanners[scanner_id]->abort(is_create);
579  abort = true;
580  }
581  }
582 
583  // if we've seen an error before then don't bother with callback
584  if (m_error != Error::OK || cancelled) {
585  maybe_callback_error(scanner_id, next);
586  if (next && scanner_id == m_current_scanner)
587  move_to_next_interval_scanner(scanner_id);
588  return;
589  }
590  else if (abort) {
591  Exception e(error, error_msg);
592  m_error = error;
593  m_error_msg = error_msg;
594  HT_ERROR_OUT << e << HT_END;
595  maybe_callback_error(scanner_id, next);
596  if (next && scanner_id == m_current_scanner)
597  move_to_next_interval_scanner(scanner_id);
598  }
599  else if (next && scanner_id == m_current_scanner) {
600  move_to_next_interval_scanner(scanner_id);
601  }
602 }
603 
604 void TableScannerAsync::handle_timeout(int scanner_id, const string &error_msg, bool is_create) {
605  bool cancelled = is_cancelled();
606  unique_lock<mutex> lock(m_mutex);
607  bool next;
608 
609  next = m_interval_scanners[scanner_id]->abort(is_create);
610  // if we've seen an error before or scanner has been cancelled then don't bother with callback
611  if (m_error != Error::OK || cancelled) {
612  maybe_callback_error(scanner_id, next);
613  return;
614  }
615 
616  HT_ERROR_OUT << "Unable to complete scan request within " << m_timeout_ms
617  << " - " << error_msg << HT_END;
619  maybe_callback_error(scanner_id, next);
620  if (next && scanner_id == m_current_scanner)
621  move_to_next_interval_scanner(scanner_id);
622 
623 }
624 
625 void TableScannerAsync::handle_result(int scanner_id, EventPtr &event, bool is_create) {
626 
627  bool cancelled = is_cancelled();
628  unique_lock<mutex> lock(m_mutex);
629  ScanCellsPtr cells;
630 
631  // abort interval scanners if we've seen an error previously or scanned has been cancelled
632  bool abort = (m_error != Error::OK || cancelled);
633 
634  bool next;
635  bool do_callback = false;
636  int current_scanner = scanner_id;
637 
638  try {
639  // If the scan already encountered an error/cancelled
640  // don't bother calling into callback anymore
641  if (abort) {
642  next = m_interval_scanners[scanner_id]->abort(is_create);
643  if (cancelled && m_error == Error::OK) {
644  // scanner was cancelled and is over
645  if (next && m_outstanding==1) {
646  do_callback = true;
647  cells = make_shared<ScanCells>();
648  }
649  else
650  do_callback = false;
651  maybe_callback_ok(scanner_id, next, do_callback, cells);
652  }
653  else
654  maybe_callback_error(scanner_id, next);
655  }
656  else {
657  // send results to interval scanner
658  next = m_interval_scanners[scanner_id]->handle_result(&do_callback, cells, event, is_create);
659  maybe_callback_ok(scanner_id, next, do_callback, cells);
660  }
661 
662  if (next)
663  move_to_next_interval_scanner(current_scanner);
664  }
665  catch (Exception &e) {
666  HT_ERROR_OUT << e << HT_END;
667  m_error = e.code();
668  m_error_msg = e.what();
669  next = !m_interval_scanners[current_scanner]->has_outstanding_requests();
670  maybe_callback_error(current_scanner, next);
671  throw;
672  }
673 }
674 
675 void TableScannerAsync::maybe_callback_error(int scanner_id, bool next) {
676  bool eos = false;
677  // ok to update m_outstanding since caller has locked mutex
678  if (next) {
679  HT_ASSERT(m_outstanding>0 && m_interval_scanners[scanner_id] != 0);
680  m_outstanding--;
681  // Aggregate profile data
682  m_profile_data += m_interval_scanners[scanner_id]->profile_data();
683  m_interval_scanners[scanner_id] = 0;
684  }
685 
686  if (m_outstanding == 0) {
687  eos = true;
688  }
689 
690  m_cb->scan_error(this, m_error, m_error_msg, eos);
691 
692  if (eos) {
693  m_cb->deregister_scanner(this);
695  m_cond.notify_all();
696  }
697 }
698 
699 void TableScannerAsync::maybe_callback_ok(int scanner_id, bool next, bool do_callback, ScanCellsPtr &cells) {
700  bool eos = false;
701  // ok to update m_outstanding since caller has locked mutex
702  if (next) {
703  HT_ASSERT(m_outstanding>0 && m_interval_scanners[scanner_id] != 0);
704  m_outstanding--;
705  // Aggregate profile data
706  m_profile_data += m_interval_scanners[scanner_id]->profile_data();
707  m_interval_scanners[scanner_id] = 0;
708  }
709 
710  if (m_outstanding == 0) {
711  eos = true;
712  }
713 
714  if (do_callback) {
715  if (eos)
716  cells->set_eos();
717  HT_ASSERT(cells != 0);
718  m_cb->scan_ok(this, cells);
719  }
720 
721  if (m_outstanding==0) {
722  m_cb->deregister_scanner(this);
724  m_cond.notify_all();
725  }
726 }
727 
729  unique_lock<mutex> lock(m_mutex);
730  m_cond.wait(lock, [this](){ return m_outstanding == 0; });
731 }
732 
734  return m_table->get_name();
735 }
736 
738  bool next = true;
739  bool cancelled = is_cancelled();
740  bool do_callback;
741  ScanCellsPtr cells;
742  bool abort = cancelled || (m_error != Error::OK);
743 
744  while (next && m_outstanding && current_scanner < ((int)m_interval_scanners.size())-1) {
745  current_scanner++;
746  // unless the scan has been aborted we should be going through scanners in order
747  if (current_scanner != m_current_scanner+1) {
748  HT_ASSERT(abort);
749  break;
750  }
751  m_current_scanner = current_scanner;
752 
753  if (m_interval_scanners[current_scanner] !=0) {
754  next = m_interval_scanners[current_scanner]->set_current(&do_callback, cells, abort);
755  HT_ASSERT(do_callback || !next || abort);
756 
757  // this is the last outstanding scanner and the scan was cancelled
758  // or failed with an error
759  if (next
760  && m_outstanding==1
761  && ((cancelled && m_error == Error::OK)
762  || m_error != Error::OK)) {
763  do_callback = true;
764  cells = make_shared<ScanCells>();
765  }
766  maybe_callback_ok(m_current_scanner, next, do_callback, cells);
767  }
768  }
769 
770  // if we skipped ALL outstanding scanners then make sure the "eos" marker
771  // is sent to the caller, and m_outstanding is decremented
772  if (next
773  && m_outstanding == 1
774  && current_scanner == ((int)m_interval_scanners.size() - 1)
775  && !cells) {
776  cells = make_shared<ScanCells>();
777  maybe_callback_ok(m_current_scanner, true, true, cells);
778  m_current_scanner = (int)m_interval_scanners.size() - 1;
779  }
780 }
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
bool has_qualifier_index_table()
returns true if this table has a qualifier index
Definition: Table.h:215
std::string get_table_name() const
Returns the name of the table as it was when the scanner was created.
bool auto_refresh()
Definition: Table.h:178
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
void handle_error(int scanner_id, int error, const std::string &error_msg, bool is_create)
Deal with errors.
ColumnPredicates column_predicates
Definition: ScanSpec.h:277
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
void init(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, ResultCallback *cb)
void move_to_next_interval_scanner(int current_scanner)
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
pair< int64_t, int64_t > time_interval
Definition: ScanSpec.h:278
void handle_timeout(int scanner_id, const std::string &error_msg, bool is_create)
Deal with timeouts.
bool get_value_index() const
Gets value index flag.
Column family specification.
std::condition_variable m_cond
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
std::shared_ptr< IntervalScannerAsync > IntervalScannerAsyncPtr
Smart pointer to IntervalScannerAsync.
STL namespace.
void set_start_time(int64_t start)
Definition: ScanSpec.h:503
bool escape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Represents a row interval.
Definition: RowInterval.h:38
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)=0
Callback method for scan errors.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Scan predicate and control specification.
Definition: ScanSpec.h:56
Represents an open table.
Definition: Table.h:58
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
Definition: StringExt.h:52
void set_end_time(int64_t end)
Definition: ScanSpec.h:507
void set_keys_only(bool val)
Return only keys (no values)
Definition: ScanSpec.h:514
void grow(size_t new_size, bool nocopy=false)
Grows the buffer and copies the data unless nocopy is true.
bool get_qualifier_index() const
Gets qualifier index flag.
void base_copy(ScanSpec &other) const
Initialize another ScanSpec object with this copy sans the intervals.
Definition: ScanSpec.h:100
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
uint32_t size
The size of the allocated memory buffer (base)
Compatibility Macros for C/C++.
bool use_index(Table *table, const ScanSpec &primary_spec, ScanSpecBuilder &index_spec, std::vector< CellPredicate > &cell_predicates, bool *use_qualifier, bool *row_intervals_applied)
void maybe_callback_error(int scanner_id, bool next)
virtual void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
#define HT_END
Definition: Logger.h:220
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)=0
Callback method for successful scan.
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
Helper class for building a ScanSpec.
Definition: ScanSpec.h:318
int32_t get_id() const
Gets column ID.
#define HT_ERROR_OUT
Definition: Logger.h:301
Declarations for Regex.
bool has_index_table()
returns true if this table has an index
Definition: Table.h:209
Hypertable definitions
SchemaPtr schema()
Definition: Table.h:144
void add_column(const string &str)
Adds a column family to be returned by the scan.
Definition: ScanSpec.h:408
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
Definition: ScanSpec.h:455
Entry point to AsyncComm service.
Definition: Comm.h:61
TablePtr get_index_table()
Definition: Table.h:235
TableScannerAsync(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, ResultCallback *cb, int flags=0)
Constructs a TableScannerAsync object.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
void cancel()
Cancels the scanner.
void handle_result(int scanner_id, EventPtr &event, bool is_create)
Deal with results of a scanner.
RowIntervals row_intervals
Definition: ScanSpec.h:275
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
This is a generic exception class for Hypertable.
Definition: Error.h:314
void maybe_callback_ok(int scanner_id, bool next, bool do_callback, ScanCellsPtr &cells)
A String class based on std::string.
ResultCallback for secondary indices; used by TableScannerAsync.
const std::string & get_name()
Definition: Table.h:139
static bool extract_prefix(const char *regex, size_t regex_len, const char **output, size_t *output_len, DynamicBuffer &buf)
Extracts a fixed prefix from regular expression.
Definition: Regex.cc:33
CellIntervals cell_intervals
Definition: ScanSpec.h:276
std::vector< IntervalScannerAsyncPtr > m_interval_scanners
Represents an open table.
Error codes, Exception handling, error logging.
virtual void deregister_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
void transform_primary_scan_spec(ScanSpecBuilder &primary_spec)
int code() const
Returns the error code.
Definition: Error.h:391
void add_index_row(ScanSpecBuilder &ssb, const char *row)
TablePtr get_qualifier_index_table()
Definition: Table.h:239