0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IndexScannerCallback.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hypertable_Lib_IndexScannerCallback_h
23 #define Hypertable_Lib_IndexScannerCallback_h
24 
25 #include <Hypertable/Lib/Client.h>
31 
32 #include <HyperAppHelper/Unique.h>
33 
34 #include <Common/Filesystem.h>
35 #include <Common/FlyweightString.h>
36 
37 #include <algorithm>
38 #include <atomic>
39 #include <condition_variable>
40 #include <deque>
41 #include <map>
42 #include <memory>
43 #include <mutex>
44 #include <vector>
45 
46 // this macro enables the "ScanSpecBuilder queue" test code; it fills the queue
47 // till it exceeds the limit, and makes sure that the queue is blocking
48 // till it gets empty again
49 #undef TEST_SSB_QUEUE
50 
51 namespace {
52  struct QualifierFilterMatch :
53  std::unary_function<std::pair<String, String>, bool> {
54  QualifierFilterMatch(const char *row) : row(row) { }
55  bool operator()(const std::pair<String, String> &filter) const {
56  if (!strncmp(filter.first.c_str(), row, filter.first.length())) {
57  if (filter.second.empty() ||
58  strstr(row+filter.first.length(), filter.second.c_str()))
59  return true;
60  }
61  return false;
62  }
63  const char *row;
64  };
65 }
66 
67 namespace Hypertable {
68 
69  static const char *tmp_schema_outer=
70  "<Schema>"
71  "<AccessGroup name=\"default\">"
72  "%s"
73  "</AccessGroup>"
74  "</Schema>";
75 
76  static const char *tmp_schema_inner=
77  "<ColumnFamily>"
78  "<Name>%s</Name>"
79  "<Counter>false</Counter>"
80  "<deleted>false</deleted>"
81  "</ColumnFamily>";
82 
86 
87 #if defined (TEST_SSB_QUEUE)
88  static const size_t SSB_QUEUE_LIMIT = 4;
89 #else
90  static const size_t SSB_QUEUE_LIMIT = 40;
91 #endif
92 
95 #if defined (TEST_SSB_QUEUE)
96  static const size_t TMP_CUTOFF = 1;
97 #else
98  static const size_t TMP_CUTOFF = 16*1024*1024; // FIXME: Config property?
99 #endif
100 
101  public:
102 
103  IndexScannerCallback(TableScannerAsync* primary_scanner, Table *primary_table,
104  const ScanSpec &primary_spec,
105  std::vector<CellPredicate> &cell_predicates,
106  ResultCallback *original_cb, uint32_t timeout_ms,
107  bool qualifier_scan,
108  bool row_intervals_applied)
109  : ResultCallback(), m_primary_scanner(primary_scanner),
110  m_primary_table(primary_table), m_primary_spec(primary_spec),
111  m_original_cb(original_cb), m_timeout_ms(timeout_ms),
112  m_qualifier_scan(qualifier_scan),
113  m_row_intervals_applied(row_intervals_applied) {
115 
116  m_cell_predicates.swap(cell_predicates);
117 
118  if (primary_spec.row_limit != 0 ||
119  primary_spec.cell_limit != 0 ||
120  primary_spec.row_offset != 0 ||
121  primary_spec.cell_offset != 0 ||
122  primary_spec.cell_limit_per_family != 0) {
123  // keep track of offset and limit
124  m_track_limits = true;
125  m_row_limit = primary_spec.row_limit;
126  m_cell_limit = primary_spec.cell_limit;
127  m_row_offset = primary_spec.row_offset;
128  m_cell_offset = primary_spec.cell_offset;
130  }
131  else
132  m_track_limits = false;
133 
134  // Setup bit pattern for all matching predicates
135  for (size_t i=0; i<primary_spec.column_predicates.size(); i++) {
136  m_all_matching <<= 1;
137  m_all_matching |= 1;
138  }
139 
140  for (auto cf : primary_table->schema()->get_column_families()) {
141  if (!cf->get_value_index() && !cf->get_qualifier_index())
142  continue;
143  m_column_map[cf->get_id()] = cf->get_name();
144  }
145  }
146 
148  std::lock_guard<std::mutex> lock1(m_scanner_mutex);
149  std::lock_guard<std::mutex> lock2(m_mutex);
150  m_scanners.clear();
151  sspecs_clear();
152  if (m_tmp_table) {
154  NamespacePtr nstmp = client->open_namespace("/tmp");
155  nstmp->drop_table(Filesystem::basename(m_tmp_table->get_name()), true);
156  }
157  }
158 
159  void shutdown() {
160  {
161  std::lock_guard<std::mutex> lock(m_mutex);
162  m_shutdown = true;
163  }
164 
165  {
166  std::lock_guard<std::mutex> lock(m_scanner_mutex);
167  for (auto s : m_scanners)
168  delete s;
169  }
170 
172 
173  }
174 
175  void sspecs_clear() {
176  for (auto ssb : m_sspecs)
177  delete ssb;
178  m_sspecs.clear();
179  m_sspecs_cond.notify_one();
180  }
181 
188  virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &scancells) {
189  bool is_eos = scancells->get_eos();
190  std::string table_name = scanner->get_table_name();
191 
192  std::unique_lock<std::mutex> lock(m_mutex);
193 
194  // ignore empty packets
195  if (scancells->get_eos() == false && scancells->empty())
196  return;
197 
198  // reached end of this scanner?
199  if (is_eos) {
200  HT_ASSERT(m_outstanding_scanners.load() > 0);
202  }
203 
204  // we've reached eos (i.e. because CELL_LIMITs/ROW_LIMITs were reached)
205  // just collect the outstanding scanners and ignore the cells
206  if (m_eos == true) {
207  if (m_outstanding_scanners.load() == 0)
208  final_decrement(is_eos);
209  return;
210  }
211 
212  // If the cells are from the index table then collect and store them
213  // in memory (or in a temporary table)
214  if (Filesystem::basename(table_name)[0] == '^')
215  collect_indices(scanner, scancells);
216  // If the cells are from the temporary table then they need to be
217  // verified against the primary table
218  else if (table_name != m_primary_table->get_name())
219  verify_results(lock, scanner, scancells);
220  // Otherwise cells are returned from the primary table: check
221  // LIMIT/OFFSET and send them to the original callback
222  else {
223  scancells->set_eos(false);
224 
225  if (m_track_limits)
226  track_predicates(scancells);
227  else
229 
230  // fetch data from the next scanner when we have reached the end of
231  // the current one
232  if (!m_limits_reached && is_eos)
233  readahead();
234  }
235 
236  final_decrement(is_eos);
237  }
238 
239  virtual void register_scanner(TableScannerAsync *scanner) {
241  }
242 
251  virtual void scan_error(TableScannerAsync *scanner, int error,
252  const std::string &error_msg, bool eos) {
253  m_original_cb->scan_error(scanner, error, error_msg, eos);
254  if (eos)
256  }
257 
258  virtual void update_ok(TableMutatorAsync *mutator) {
259  }
260 
261  virtual void update_error(TableMutatorAsync *mutator, int error,
262  FailedMutations &failures) {
263  m_original_cb->update_error(mutator, error, failures);
264  }
265 
266  private:
267  void final_decrement(bool is_eos) {
268  // If the last outstanding scanner just finished; send an "eos"
269  // packet to the original callback and decrement the outstanding scanners
270  // once more (this is the equivalent operation to the increment in
271  // the constructor)
272  if ((is_eos || m_eos) && m_outstanding_scanners.load() == 0) {
273  m_eos = true;
274  HT_ASSERT(m_final_decrement == false);
275  if (!m_final_decrement) {
276  // send empty eos package to caller
277  ScanCellsPtr empty = std::make_shared<ScanCells>();
278  empty->set_eos();
281  m_final_decrement = true;
282  }
283  }
284  }
285 
286  void collect_indices(TableScannerAsync *scanner, ScanCellsPtr &scancells) {
287  const ScanSpec &primary_spec = m_primary_spec.get();
288  // split the index row into column id, cell value and cell row key
289  Cells cells;
290  const char *unescaped_row;
291  const char *unescaped_qualifier;
292  const char *unescaped_value;
293  size_t unescaped_row_len;
294  size_t unescaped_qualifier_len;
295  size_t unescaped_value_len;
296  LoadDataEscape escaper_row;
297  LoadDataEscape escaper_qualifier;
298  LoadDataEscape escaper_value;
299  scancells->get(cells);
300  for (auto &cell : cells) {
301  char *qv = (char *)cell.row_key;
302 
303  // get unescaped row
304  char *row;
305  if ((row = strrchr(qv, '\t')) == 0) {
306  HT_WARNF("Invalid index entry '%s' in index table '^%s'",
307  qv, m_primary_table->get_name().c_str());
308  continue;
309  }
310  *row++ = 0;
311  escaper_row.unescape(row, strlen(row), &unescaped_row, &unescaped_row_len);
312 
313  // if the original query specified row intervals then these have
314  // to be filtered in the client
315  if (!m_row_intervals_applied && primary_spec.row_intervals.size()) {
316  if (!row_intervals_match(primary_spec.row_intervals, unescaped_row))
317  continue;
318  }
319 
320  // cut off the "%d," part at the beginning to get the column id
321  // The max. column id is 255, therefore there must be a ',' after 3
322  // positions
323  char *id = qv;
324  while (*qv != ',' && (qv - id <= 4))
325  qv++;
326  if (*qv != ',') {
327  HT_WARNF("Invalid index entry '%s' in index table '^%s'",
328  id, m_primary_table->get_name().c_str());
329  continue;
330  }
331  *qv++ = 0;
332  uint32_t cfid = (uint32_t)atoi(id);
333  if (!cfid || m_column_map.find(cfid) == m_column_map.end()) {
334  HT_WARNF("Invalid index entry '%s' in index table '^%s'",
335  qv, m_primary_table->get_name().c_str());
336  continue;
337  }
338 
339  // same about cell intervals
340  if (primary_spec.cell_intervals.size()) {
341  if (!cell_intervals_match(primary_spec.cell_intervals, unescaped_row,
342  m_column_map[cfid].c_str()))
343  continue;
344  }
345 
346  uint32_t matching = 0;
347 
348  if (m_qualifier_scan) {
349  escaper_qualifier.unescape(qv, strlen(qv),
350  &unescaped_qualifier, &unescaped_qualifier_len);
351 
352  if (primary_spec.and_column_predicates) {
353  std::bitset<32> bits;
354  m_cell_predicates[cfid].all_matches(unescaped_qualifier,
355  unescaped_qualifier_len,
356  "", 0, bits);
357  if ((matching = (uint32_t)bits.to_ulong()) == 0L)
358  continue;
359 
360  }
361  else if (!m_cell_predicates[cfid].matches(unescaped_qualifier,
362  unescaped_qualifier_len, "", 0))
363  continue;
364  }
365  else {
366  char *value = qv;
367  if ((qv = strchr(value, '\t')) == 0) {
368  HT_WARNF("Invalid index entry '%s' in index table '^%s'",
369  value, m_primary_table->get_name().c_str());
370  continue;
371  }
372  size_t value_len = qv-value;
373  *qv++ = 0;
374  escaper_qualifier.unescape(qv, strlen(qv),
375  &unescaped_qualifier, &unescaped_qualifier_len);
376  escaper_value.unescape(value, value_len,
377  &unescaped_value, &unescaped_value_len);
378  if (primary_spec.and_column_predicates) {
379  std::bitset<32> bits;
380  m_cell_predicates[cfid].all_matches(unescaped_qualifier,
381  unescaped_qualifier_len,
382  unescaped_value,
383  unescaped_value_len,
384  bits);
385  if ((matching = (uint32_t)bits.to_ulong()) == 0L)
386  continue;
387 
388  }
389  else if (!m_cell_predicates[cfid].matches(unescaped_qualifier,
390  unescaped_qualifier_len,
391  unescaped_value,
392  unescaped_value_len))
393  continue;
394  }
395 
396  // if a temporary table was already created then store it in the
397  // temporary table. otherwise buffer it in memory but make sure
398  // that no duplicate rows are inserted
399  KeySpec key;
400  key.row = m_strings.get(unescaped_row);
401  key.row_len = unescaped_row_len;
402  key.column_family = m_column_map[cfid].c_str();
403  key.timestamp = cell.timestamp;
404  if (m_qualifier_scan) {
405  key.column_qualifier = m_strings.get(unescaped_qualifier);
406  key.column_qualifier_len = unescaped_qualifier_len;
407  }
408  if (m_mutator)
409  m_mutator->set(key, &matching, sizeof(matching));
410  else {
411  CkeyMap::iterator it = m_tmp_keys.find(key);
412  if (it == m_tmp_keys.end())
413  m_tmp_keys.insert(CkeyMap::value_type(key, matching));
414  else
415  it->second |= matching;
416  m_tmp_cutoff += sizeof(KeySpec) + key.row_len + key.column_qualifier_len;
417  }
418  }
419 
420  // reached EOS? then flush the mutator
421  if (scancells->get_eos()) {
422  if (m_mutator)
423  m_mutator.reset();
424  if (!m_tmp_table && m_tmp_keys.empty()) {
425  m_eos = true;
426  return;
427  }
428  }
429  // not EOS? then more keys will follow
430  else {
431  // store all buffered keys in a temp. table if we have too many
432  // results from the index
433  if (!m_tmp_table && m_tmp_cutoff > TMP_CUTOFF) {
435  for (CkeyMap::iterator it = m_tmp_keys.begin();
436  it != m_tmp_keys.end(); ++it)
437  m_mutator->set(it->first, &it->second, sizeof(it->second));
438  }
439  // if a temp table existed (or was just created): clear the buffered
440  // keys. they're no longer required
441  if (m_tmp_table) {
442  m_tmp_keys.clear();
443  m_strings.clear();
444  }
445 
446  return;
447  }
448 
449  // we've reached EOS. If there's a temporary table then create a
450  // scanner for this table. Otherwise immediately send the temporary
451  // results to the primary table for verification
452  ScanSpecBuilder ssb;
453 
454  std::lock_guard<std::mutex> lock(m_scanner_mutex);
455  if (m_shutdown)
456  return;
457 
459  if (m_tmp_table) {
460  s = m_tmp_table->create_scanner_async(this, ssb.get(),
462  }
463  else {
464  ssb.set_max_versions(primary_spec.max_versions);
465  ssb.set_return_deletes(primary_spec.return_deletes);
466  ssb.set_keys_only(primary_spec.keys_only);
467  ssb.set_row_regexp(primary_spec.row_regexp);
468 
469  // Fetch primary columns and restrict by time interval
470  for (auto col : primary_spec.columns)
471  ssb.add_column(col);
472  ssb.set_time_interval(primary_spec.time_interval.first,
473  primary_spec.time_interval.second);
474 
475  const char *last_row = "";
476 
477  if (primary_spec.and_column_predicates) {
478 
479  // Add row interval for each entry returned from index
480  for (CkeyMap::iterator it = m_tmp_keys.begin();
481  it != m_tmp_keys.end(); ++it) {
482  if (strcmp((const char *)it->first.row, last_row)) {
484  ssb.add_row(last_row);
485  last_row = (const char *)it->first.row;
486  m_cur_matching = it->second;
487  }
488  else
489  m_cur_matching |= it->second;
490  }
492  ssb.add_row(last_row);
493  m_cur_matching = 0;
494  }
495  else {
496  for (CkeyMap::iterator it = m_tmp_keys.begin();
497  it != m_tmp_keys.end(); ++it) {
498  if (strcmp((const char *)it->first.row, last_row)) {
499  if (*last_row)
500  ssb.add_row(last_row);
501  last_row = (const char *)it->first.row;
502  }
503  }
504  if (*last_row)
505  ssb.add_row(last_row);
506  }
507 
508  if (ssb.get().row_intervals.empty()) {
509  m_eos = true;
510  return;
511  }
512 
514 
515  s = m_primary_table->create_scanner_async(this, ssb.get(),
517 
518  // clean up
519  m_tmp_keys.clear();
520  m_strings.clear();
521  }
522 
523  m_scanners.push_back(s);
524  }
525 
526  /*
527  * the temporary table mimicks the primary table: all column families
528  * with an index are also created for the temporary table
529  */
533 
534  std::string inner;
535  for (auto cf : m_primary_table->schema()->get_column_families()) {
536  if (m_qualifier_scan && !cf->get_qualifier_index())
537  continue;
538  if (!m_qualifier_scan && !cf->get_value_index())
539  continue;
540  inner += format(tmp_schema_inner, cf->get_name().c_str());
541  }
542 
544  NamespacePtr nstmp( client->open_namespace("/tmp") );
545  std::string guid = HyperAppHelper::generate_guid();
546  nstmp->create_table(guid, format(tmp_schema_outer, inner.c_str()));
547  m_tmp_table = nstmp->open_table(guid);
548 
549  m_mutator.reset(m_tmp_table->create_mutator_async(this));
550  }
551 
552  void verify_results(std::unique_lock<std::mutex> &lock, TableScannerAsync *scanner,
553  ScanCellsPtr &scancells) {
554  // no results from the primary table, or LIMIT/CELL_LIMIT exceeded?
555  // then return immediately
556  if ((scancells->get_eos() && scancells->empty() &&
558  sspecs_clear();
559  m_eos = true;
560  return;
561  }
562 
563  const ScanSpec &primary_spec = m_primary_spec.get();
564 
565  Cells cells;
566  scancells->get(cells);
567  const char *last = m_last_rowkey_verify.c_str();
568 
569  // this test code creates one ScanSpec for each single row that is
570  // received from the temporary table. As soon as the scan spec queue
571  // overflows it will block till the primary table scanners clear the
572  // queue.
573  //
574  // see below for more comments
575 #if defined (TEST_SSB_QUEUE)
576  for (auto &cell : cells) {
577  if (!strcmp(last, (const char *)cell.row_key))
578  continue;
579  last = (const char *)cell.row_key;
580 
581  ScanSpecBuilder *ssb = new ScanSpecBuilder;
582  for (const auto &s : primary_spec.columns)
583  ssb->add_column(s.c_str());
584  ssb->set_max_versions(primary_spec.max_versions);
585  ssb->set_return_deletes(primary_spec.return_deletes);
586  for (const auto &cp : primary_spec.column_predicates)
587  ssb->add_column_predicate(cp.column_family, cp.operation,
588  cp.value, cp.value_len);
589  if (primary_spec.value_regexp)
590  ssb->set_value_regexp(primary_spec.value_regexp);
591 
592  ssb->add_row(cell.row_key);
593 
594  m_last_rowkey_verify = last;
595 
596  m_sspecs_cond.wait(lock, [this](){
597  return m_sspecs.size() <= SSB_QUEUE_LIMIT || m_limits_reached; });
598 
599  if (m_limits_reached) {
600  delete ssb;
601  return;
602  }
603 
604  m_sspecs.push_back(ssb);
605  if (m_outstanding_scanners.load() <= 1)
606  readahead();
607  }
608 #else
609  // This is the "production-ready" code, using a single ScanSpec for all
610  // rows that are returned from the intermediate table
611  //
612  // Create a new ScanSpec
613  ScanSpecBuilder *ssb = new ScanSpecBuilder;
614  for (auto col : primary_spec.columns)
615  ssb->add_column(col);
616  ssb->set_max_versions(primary_spec.max_versions);
617  ssb->set_return_deletes(primary_spec.return_deletes);
618  ssb->set_keys_only(primary_spec.keys_only);
619  if (primary_spec.value_regexp)
620  ssb->set_value_regexp(primary_spec.value_regexp);
621 
622  uint32_t matching;
623 
624  // foreach_ht cell from the secondary index: verify that it exists in
625  // the primary table, but make sure that each rowkey is only inserted
626  // ONCE
627  if (primary_spec.and_column_predicates) {
628  for (auto &cell : cells) {
629 
630  HT_ASSERT(cell.value_len == sizeof(matching));
631  memcpy(&matching, cell.value, sizeof(matching));
632 
633  if (!strcmp(last, (const char *)cell.row_key)) {
634  m_cur_matching |= matching;
635  continue;
636  }
637 
638  // then add the key to the ScanSpec
640  ssb->add_row(last);
641 
642  last = (const char *)cell.row_key;
643 
644  m_cur_matching = matching;
645  }
646  if (scancells->get_eos() && m_cur_matching == m_all_matching) {
647  m_cur_matching = 0;
648  ssb->add_row(last);
649  last = "";
650  }
651  }
652  else {
653  for (auto &cell : cells) {
654  if (!strcmp(last, (const char *)cell.row_key))
655  continue;
656  // then add the key to the ScanSpec
657  ssb->add_row(last);
658  last = (const char *)cell.row_key;
659  }
660  if (scancells->get_eos()) {
661  ssb->add_row(last);
662  last = "";
663  }
664  }
665 
667 
668  // store the "last" pointer before it goes out of scope
669  m_last_rowkey_verify = last;
670 
671  m_sspecs_cond.wait(lock, [this](){
672  return m_sspecs.size() <= SSB_QUEUE_LIMIT || m_limits_reached; });
673 
674  // if, in the meantime, we reached any CELL_LIMIT/ROW_LIMIT then return
675  if (m_limits_reached || ssb->get().row_intervals.empty()) {
676  delete ssb;
677  return;
678  }
679 
680  // store ScanSpec in the queue
681  m_sspecs.push_back(ssb);
682 
683  // there should always at least be two scanners outstanding: this scanner
684  // from the intermediate table and one scanner from the primary table.
685  // If not then make sure to start another readahead scanner on the
686  // primary table.
687  if (m_outstanding_scanners.load() <= 0)
688  readahead();
689 #endif
690  }
691 
692  void readahead() {
693  HT_ASSERT(m_limits_reached == false);
694  HT_ASSERT(m_eos == false);
695 
696  if (m_sspecs.empty())
697  return;
698 
699  ScanSpecBuilder *ssb = m_sspecs[0];
700  m_sspecs.pop_front();
701  if (m_shutdown) {
702  delete ssb;
703  return;
704  }
705  TableScannerAsync *s =
708 
710  delete ssb;
711  m_sspecs_cond.notify_one();
712 
713  std::lock_guard<std::mutex> lock(m_scanner_mutex);
714  m_scanners.push_back(s);
715  }
716 
717  void track_predicates(ScanCellsPtr &scancells) {
718  // no results from the primary table, or LIMIT/CELL_LIMIT exceeded?
719  // then return immediately
720  if ((scancells->get_eos() && scancells->empty()) || m_limits_reached) {
721  sspecs_clear();
722  m_eos = true;
723  return;
724  }
725 
726  // count cells and rows; skip CELL_OFFSET/OFFSET cells/rows and reduce
727  // the results to CELL_LIMIT/LIMIT cells/rows
728  ScanCellsPtr scp = std::make_shared<ScanCells>();
729  Cells cells;
730  scancells->get(cells);
731  const char *last = m_last_rowkey_tracking.size()
732  ? m_last_rowkey_tracking.c_str()
733  : "";
734  bool skip_row = false;
735  for (auto &cell : cells) {
736  bool new_row = false;
737  if (strcmp(last, cell.row_key)) {
738  new_row = true;
739  skip_row = false;
740  last = cell.row_key;
742  m_cell_count = 0;
743  // adjust row offset
744  if (m_row_offset) {
745  m_row_offset--;
746  skip_row = true;
747  continue;
748  }
749  }
750  else if (skip_row)
751  continue;
752 
753  // check cell offset
754  if (m_cell_offset) {
755  m_cell_offset--;
756  continue;
757  }
758  // check row offset
759  if (m_row_offset)
760  continue;
761  // check cell limit
763  m_limits_reached = true;
764  break;
765  }
766  // check row limit
767  if (m_row_limit && new_row && m_row_count >= m_row_limit) {
768  m_limits_reached = true;
769  break;
770  }
771  // check cell limit per family
773  // cell pointers will go out of scope, therefore "own" is true
774  scp->add(cell, true);
775  }
776 
777  m_cell_count++;
778  if (new_row)
779  m_row_count++;
780  }
781 
782  // store the contents of "last" before it goes out of scope
783  m_last_rowkey_tracking = last;
784 
785  // send the results to the original callback
786  if (scp->size())
788  }
789 
790  bool row_intervals_match(const RowIntervals &rivec, const char *row) {
791  for (const auto &ri : rivec) {
792  if (ri.start && ri.start[0]) {
793  if (ri.start_inclusive) {
794  if (strcmp(row, ri.start)<0)
795  continue;
796  }
797  else {
798  if (strcmp(row, ri.start)<=0)
799  continue;
800  }
801  }
802  if (ri.end && ri.end[0]) {
803  if (ri.end_inclusive) {
804  if (strcmp(row, ri.end)>0)
805  continue;
806  }
807  else {
808  if (strcmp(row, ri.end)>=0)
809  continue;
810  }
811  }
812  return true;
813  }
814  return false;
815  }
816 
817  bool cell_intervals_match(const CellIntervals &civec, const char *row,
818  const char *column) {
819  for (const auto &ci : civec) {
820  if (ci.start_row && ci.start_row[0]) {
821  int s=strcmp(row, ci.start_row);
822  if (s>0)
823  return true;
824  if (s<0)
825  continue;
826  }
827  if (ci.start_column && ci.start_column[0]) {
828  if (ci.start_inclusive) {
829  if (strcmp(column, ci.start_column)<0)
830  continue;
831  }
832  else {
833  if (strcmp(column, ci.start_column)<=0)
834  continue;
835  }
836  }
837  if (ci.end_row && ci.end_row[0]) {
838  int s=strcmp(row, ci.end_row);
839  if (s<0)
840  return true;
841  if (s>0)
842  continue;
843  }
844  if (ci.end_column && ci.end_column[0]) {
845  if (ci.end_inclusive) {
846  if (strcmp(column, ci.end_column)>0)
847  continue;
848  }
849  else {
850  if (strcmp(column, ci.end_column)>=0)
851  continue;
852  }
853  }
854  return true;
855  }
856  return false;
857  }
858 
859  typedef std::map<KeySpec, uint32_t> CkeyMap;
860  typedef std::map<String, String> CstrMap;
861 
862  // a weak pointer to the primary scanner
864 
865  // a pointer to the primary table
867 
868  // the original scan spec for the primary table
870 
871  // Vector of first-pass cell predicates
872  std::vector<CellPredicate> m_cell_predicates;
873 
874  // the original callback object specified by the user
876 
877  // the original timeout value specified by the user
878  uint32_t m_timeout_ms {};
879 
880  // a list of all scanners that are created in this object
881  std::vector<TableScannerAsync *> m_scanners;
882 
883  // a mutex for m_scanners
885 
886  // a deque of ScanSpecs, needed for readahead in the primary table
887  std::deque<ScanSpecBuilder *> m_sspecs;
888 
889  // a condition to wait if the sspecs-queue is too full
890  std::condition_variable m_sspecs_cond;
891 
892  // a mapping from column id to column name
893  std::map<uint32_t, String> m_column_map;
894 
895  // the temporary table; can be NULL
897 
898  // a mutator for the temporary table
899  std::unique_ptr<TableMutatorAsync> m_mutator;
900 
901  // limit and offset values from the original ScanSpec
902  int m_row_limit {};
903  int m_cell_limit {};
904  int m_cell_count {};
905  int m_row_offset {};
907  int m_row_count {};
909 
910  // we reached eos - no need to continue scanning
911  bool m_eos {};
912 
913  // track limits and offsets
914  bool m_track_limits {};
915 
916  // limits were reached, all following keys are discarded
918 
919  // a mutex
921 
922  // counting the read-ahead scans
924 
925  // temporary storage to persist pointer data before it goes out of scope
926  std::string m_last_rowkey_verify;
927 
928  // temporary storage to persist pointer data before it goes out of scope
930 
931  // Carry-over matching bits for last key
932  uint32_t m_cur_matching {};
933 
934  // Bit-pattern for all matching predicates
935  uint32_t m_all_matching {};
936 
937  // true if this index is a qualifier index
939 
940  // true if the row intervals have been applied to the index scan
942 
943  // buffer for accumulating keys from the index
944  CkeyMap m_tmp_keys;
945 
946  // buffer for accumulating keys from the index
948 
949  // accumulator; if > TMP_CUTOFF then store all index results in a
950  // temporary table
951  size_t m_tmp_cutoff {};
952 
953  // keep track whether we called final_decrement()
955 
956  // number of outstanding scanners (this is more precise than m_outstanding)
957  std::atomic<int> m_outstanding_scanners {0};
958 
959  // shutting down this scanner?
960  bool m_shutdown {};
961  };
962 
963  inline bool operator<(const KeySpec &lhs, const KeySpec &rhs) {
964  size_t len1 = strlen((const char *)lhs.row);
965  size_t len2 = strlen((const char *)rhs.row);
966  int cmp = memcmp(lhs.row, rhs.row, std::min(len1, len2));
967  if (cmp > 0)
968  return false;
969  if (cmp < 0)
970  return true;
971  if (len1 < len2)
972  return true;
973  return false;
974  }
975 }
976 
977 #endif // Hypertable_Lib_IndexScannerCallback_h
int64_t timestamp
Definition: KeySpec.h:130
std::unique_ptr< TableMutatorAsync > m_mutator
static std::mutex mutex
Definition: Logger.cc:43
TableScannerAsync * create_scanner_async(ResultCallback *cb, const ScanSpec &scan_spec, uint32_t timeout_ms=0, int32_t flags=0)
Creates an asynchronous scanner on this table.
Definition: Table.cc:213
const char * row_regexp
Definition: ScanSpec.h:279
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
#define HT_WARNF(msg,...)
Definition: Logger.h:290
NamespacePtr open_namespace(const std::string &name, Namespace *base=NULL)
Opens a Namespace.
Definition: Client.cc:106
Namespace * get_namespace()
Definition: Table.h:247
std::string get_table_name() const
Returns the name of the table as it was when the scanner was created.
Abstract base class for a filesystem.
ColumnPredicates column_predicates
Definition: ScanSpec.h:277
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
static const size_t TMP_CUTOFF
if more than TMP_CUTOFF bytes are received from the index then store all results in a temporary table...
Asynchronous table scanner.
bool row_intervals_match(const RowIntervals &rivec, const char *row)
pair< int64_t, int64_t > time_interval
Definition: ScanSpec.h:278
const char * value_regexp
Definition: ScanSpec.h:280
const char * column_qualifier
Definition: KeySpec.h:128
void add_row(const string &str)
Adds a row to be returned in the scan.
Definition: ScanSpec.h:441
bool cell_intervals_match(const CellIntervals &civec, const char *row, const char *column)
The Flyweight string set stores duplicate strings efficiently.
virtual void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
size_t column_qualifier_len
Definition: KeySpec.h:129
const void * row
Definition: KeySpec.h:125
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
Definition: ScanSpec.h:401
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)
Callback method for update errors.
std::map< uint32_t, String > m_column_map
String generate_guid()
Generates a new GUID.
Definition: Unique.h:44
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)=0
Callback method for scan errors.
std::shared_ptr< Namespace > NamespacePtr
Shared smart pointer to Namespace.
Definition: Namespace.h:333
#define HT_ASSERT(_e_)
Definition: Logger.h:396
IndexScannerCallback(TableScannerAsync *primary_scanner, Table *primary_table, const ScanSpec &primary_spec, std::vector< CellPredicate > &cell_predicates, ResultCallback *original_cb, uint32_t timeout_ms, bool qualifier_scan, bool row_intervals_applied)
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
static const char * tmp_schema_outer
vector< RowInterval, RowIntervalAlloc > RowIntervals
Definition: ScanSpec.h:44
Scan predicate and control specification.
Definition: ScanSpec.h:56
Represents an open table.
Definition: Table.h:58
vector< CellInterval, CellIntervalAlloc > CellIntervals
Definition: ScanSpec.h:47
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
Flyweight string set.
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
Definition: ScanSpec.h:394
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)=0
Callback method for update errors.
void set_keys_only(bool val)
Return only keys (no values)
Definition: ScanSpec.h:514
std::deque< ScanSpecBuilder * > m_sspecs
void set_return_deletes(bool val)
Internal use only.
Definition: ScanSpec.h:521
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)=0
Callback method for successful scan.
Helper class for building a ScanSpec.
Definition: ScanSpec.h:318
static const char * tmp_schema_inner
Client * get_client()
Returns a pointer to the client object which created this Namespace.
Definition: Namespace.h:292
std::condition_variable m_sspecs_cond
std::map< String, String > CstrMap
static String basename(String name, char separator= '/')
A posix-compliant basename() which strips directory names from a filename.
Definition: Filesystem.cc:154
void track_predicates(ScanCellsPtr &scancells)
std::map< KeySpec, uint32_t > CkeyMap
bool operator<(const directory_entry< _Key, _Tp > &lhs, const directory_entry< _Key, _Tp > &rhs)
Definition: directory.h:128
Hypertable definitions
std::vector< TableScannerAsync * > m_scanners
virtual void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
SchemaPtr schema()
Definition: Table.h:144
void set_time_interval(int64_t start, int64_t end)
Sets the time interval of the scan.
Definition: ScanSpec.h:499
void add_column(const string &str)
Adds a column family to be returned by the scan.
Definition: ScanSpec.h:408
const char * get(const char *str)
Returns a copy of the string; this string is valid till the FlyweightString set is destructed...
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)
Callback method for scan errors.
void clear()
Clears and deallocates the set of strings.
RowIntervals row_intervals
Definition: ScanSpec.h:275
ResultCallback for secondary indices; used by TableScannerAsync.
void verify_results(std::unique_lock< std::mutex > &lock, TableScannerAsync *scanner, ScanCellsPtr &scancells)
std::vector< CellPredicate > m_cell_predicates
const std::string & get_name()
Definition: Table.h:139
CellIntervals cell_intervals
Definition: ScanSpec.h:276
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
Definition: ScanSpec.h:387
Represents an open table.
bool unescape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
const char * column_family
Definition: KeySpec.h:127
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
Definition: ScanSpec.h:528
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
void collect_indices(TableScannerAsync *scanner, ScanCellsPtr &scancells)
void wait_for_completion()
Blocks till outstanding == 0.
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &scancells)
Callback method for successful scan.