0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MergeScannerAccessGroup.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
30 
31 #include <Hypertable/Lib/Key.h>
32 #include <Hypertable/Lib/Schema.h>
33 
34 #include "Common/Logger.h"
35 
36 using namespace Hypertable;
37 
38 
40  ScanContext *scan_ctx,
41  uint32_t flags)
42  : m_flags(flags), m_return_deletes(flags & RETURN_DELETES),
43  m_accumulate_counters(flags & ACCUMULATE_COUNTERS), m_prev_cf(-1),
44  m_counted_value(12), m_scan_context(scan_ctx)
45 {
46  m_start_timestamp = scan_ctx->time_interval.first;
47  m_end_timestamp = scan_ctx->time_interval.second;
48  m_revision = scan_ctx->revision;
49 
50  bool has_index = false;
51  bool has_qualifier_index = false;
52 
53  if (flags & IS_COMPACTION) {
54  // check if there are any indices in this schema
55  for (auto cf_spec : scan_ctx->schema->get_column_families()){
56  if (!cf_spec || cf_spec->get_deleted())
57  continue;
58  if (cf_spec->get_value_index()) {
59  HT_INFO("Compaction scan has cell value index");
60  has_index = true;
61  }
62  if (cf_spec->get_qualifier_index()) {
63  HT_INFO("Compaction scan has column qualifier index");
64  has_qualifier_index = true;
65  }
66 
67  if (has_index && has_qualifier_index)
68  break;
69  }
70  }
71 
72  if (has_index || has_qualifier_index)
73  m_index_updater = IndexUpdaterFactory::create(table_name, scan_ctx->schema,
74  has_index, has_qualifier_index);
75 }
76 
78  try {
81  }
82  catch (Hypertable::Exception &e) {
83  HT_ERROR_OUT << "Problem destroying MergeScannerAccessGroup : " << e
84  << HT_END;
85  }
86 }
87 
89  ScannerState sstate;
90  Key key;
91  bool counter;
92  int64_t cell_cutoff, cur_bytes = 0;
93 
94  if (m_queue.empty()) {
95  if (m_count_present)
96  finish_count();
97  else
98  m_no_forward = false;
99  return;
100  }
101 
102  sstate = m_queue.top();
103 
104  // while the queue is not empty: pop the top element, forward it,
105  // re-insert it back into the queue
106  while (true) {
107  while (true) {
108  m_queue.pop();
109 
110  // In some cases the forward might already be done and so the
111  // scanner shdn't be forwarded again. For example you know a counter
112  // is done only after forwarding to the 1st post counter cell or
113  // reaching the end of the scan.
114  if (m_no_forward)
115  m_no_forward = false;
116  else
117  sstate.scanner->forward();
118 
119  if (sstate.scanner->get(sstate.key, sstate.value))
120  m_queue.push(sstate);
121 
122  if (m_queue.empty()) {
123  // scan ended on a counter
124  if (m_count_present)
125  finish_count();
126  return;
127  }
128 
129  sstate = m_queue.top();
130 
131  // update I/O tracking
132  cur_bytes = sstate.key.length + sstate.value.length();
133  io_add_input_cell(cur_bytes);
134 
135  CellPredicate &cp =
137 
138  // we only need to care about counters for a MergeScanner which is
139  // merging over a single access group since no counter will span
140  // multiple access groups
141  cell_cutoff = m_scan_context->cell_predicates[
142  sstate.key.column_family_code].cutoff_time;
143  counter = m_accumulate_counters &&
145 
146  // apply the various filters...
147  if (sstate.key.timestamp < cell_cutoff) {
148  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
149  purge_from_index(sstate.key, sstate.value);
150  continue;
151  }
152  else if (sstate.key.timestamp < m_start_timestamp) {
153  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
154  purge_from_index(sstate.key, sstate.value);
155  continue;
156  }
157  else if (sstate.key.revision > m_revision ||
158  (sstate.key.timestamp >= m_end_timestamp &&
159  sstate.key.flag == FLAG_INSERT)) {
160  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
161  purge_from_index(sstate.key, sstate.value);
162  continue;
163  }
164  else if (sstate.key.flag == FLAG_DELETE_ROW) {
165  if (matches_deleted_row(sstate.key)) {
168  }
169  else
170  update_deleted_row(sstate.key);
171  if (m_return_deletes)
172  break;
173  }
174  else if (sstate.key.flag == FLAG_DELETE_COLUMN_FAMILY) {
175  if (matches_deleted_column_family(sstate.key)) {
178  }
179  else
181  if (m_return_deletes)
182  break;
183  }
184  else if (sstate.key.flag == FLAG_DELETE_CELL) {
185  if (matches_deleted_cell(sstate.key)) {
188  }
189  else
190  update_deleted_cell(sstate.key);
191  if (m_return_deletes)
192  break;
193  }
194  else if (sstate.key.flag == FLAG_DELETE_CELL_VERSION) {
195  if (matches_deleted_cell_version(sstate.key)) {
197  }
198  else
200  if (m_return_deletes)
201  break;
202  }
203  else if (sstate.key.flag == FLAG_INSERT) {
204  // this cell is not a delete and it is within the requested
205  // time interval.
206  if (m_delete_present) {
207  if (m_deleted_cell_version.fill() > 0) {
208  if (!matches_deleted_cell_version(sstate.key)) {
209  // we wont see the previously seen deleted cell version again
212  }
213  else if (m_deleted_cell_version_set.find(sstate.key.timestamp) !=
215  // apply previously seen delete cell version to this cell
216  if (m_index_updater)
217  purge_from_index(sstate.key, sstate.value);
218  continue;
219  }
220  }
221  if (m_deleted_cell.fill() > 0) {
222  if (!matches_deleted_cell(sstate.key))
223  // we wont see the previously seen deleted cell again
225  else if (sstate.key.timestamp <= m_deleted_cell_timestamp) {
226  // apply previously seen delete cell to this cell
227  if (m_index_updater)
228  purge_from_index(sstate.key, sstate.value);
229  continue;
230  }
231  }
232  if (m_deleted_column_family.fill() > 0) {
233  if (!matches_deleted_column_family(sstate.key))
234  // we wont see the previously seen deleted column family again
236  else if (sstate.key.timestamp <= m_deleted_column_family_timestamp){
237  // apply previously seen delete column family to this cell
238  if (m_index_updater)
239  purge_from_index(sstate.key, sstate.value);
240  continue;
241  }
242  }
243  if (m_deleted_row.fill() > 0) {
244  if (!matches_deleted_row(sstate.key))
245  // we wont see the previously seen deleted row family again
247  else if (sstate.key.timestamp <= m_deleted_row_timestamp) {
248  // apply previously seen delete row family to this cell
249  if (m_index_updater)
250  purge_from_index(sstate.key, sstate.value);
251  continue;
252  }
253  }
254  if (m_deleted_cell_version.fill() == 0
255  && m_deleted_cell.fill() == 0
256  && m_deleted_column_family.fill() == 0
257  && m_deleted_row.fill() == 0)
258  m_delete_present = false;
259  }
260 
261  // keep track of revisions
262  const uint8_t *latest_key = (const uint8_t *)sstate.key.row;
263  size_t latest_key_len = sstate.key.flag_ptr -
264  (const uint8_t *)sstate.key.row + 1;
265 
266  if (m_prev_key.fill()==0) {
267  m_prev_key.set(latest_key, latest_key_len);
269  m_revs_count=0;
271  }
272  else if (m_prev_key.fill() != latest_key_len ||
273  memcmp(latest_key, m_prev_key.base, latest_key_len)) {
274 
275  m_prev_key.set(latest_key, latest_key_len);
277  m_revs_count=0;
279  }
280  m_revs_count++;
281  if (m_revs_limit && m_revs_count > m_revs_limit && !counter)
282  continue;
283 
284  // row set
285  if (!m_scan_context->rowset.empty()) {
286  int cmp = 1;
287  while (!m_scan_context->rowset.empty()
288  && (cmp = strcmp(*m_scan_context->rowset.begin(),
289  sstate.key.row)) < 0)
290  m_scan_context->rowset.erase(m_scan_context->rowset.begin());
291  if (cmp > 0)
292  continue;
293  }
294  // cell predicate match
295 
296  const uint8_t *value;
297  size_t value_len = sstate.value.decode_length(&value);
298  if (!cp.matches(sstate.key.column_qualifier,
299  (size_t)sstate.key.column_qualifier_len,
300  (const char *)value, value_len))
301  continue;
302  // row regexp
303  if (m_scan_context->row_regexp) {
304  bool cached, match;
305  m_regexp_cache.check_rowkey(sstate.key.row, &cached, &match);
306  if (!cached) {
307  match = RE2::PartialMatch(sstate.key.row,
309  m_regexp_cache.set_rowkey(sstate.key.row, match);
310  }
311  if (!match)
312  continue;
313  }
314  // filter but value regexp last since its probly the most expensive
315  if (m_scan_context->value_regexp && !counter) {
316  const uint8_t *dptr;
317  if (!RE2::PartialMatch(re2::StringPiece(sstate.value.str(),
318  sstate.value.decode_length(&dptr)),
320  continue;
321  }
322  break;
323  }
325  break;
326  }
327 
328  // deal with counters. apply row_limit but not revs/cell_limit_per_family
329  if (m_count_present) {
330  if(counter && matches_counted_key(sstate.key)) {
331  if (sstate.key.flag == FLAG_INSERT) {
332  // keep incrementing
333  increment_count(sstate.key, sstate.value);
334  continue;
335  }
336  }
337  else {
338  // count done, new count seen but not started
339  finish_count();
340  break;
341  }
342  }
343  else if (counter && sstate.key.flag == FLAG_INSERT) {
344  // start new count and loop
345  start_count(sstate.key, sstate.value);
346  continue;
347  }
348 
349  break;
350  }
351 
352  io_add_output_cell(cur_bytes);
353 }
354 
356  if (!m_initialized)
357  initialize();
358 
359  if (m_done)
360  return false;
361 
362  // check if we have a counter result ready
363  if (m_no_forward) {
364  key = m_counted_key;
365  value.ptr = m_counted_value.base;
366  return true;
367  }
368 
369  // otherwise pick the next key/value from the queue
370  if (!m_queue.empty()) {
371  const ScannerState &sstate = m_queue.top();
372  key = sstate.key;
373  value = sstate.value;
374  return true;
375  }
376 
377  return false;
378 }
379 
380 
381 
383  int64_t amount = m_disk_read;
384  for (size_t i=0; i<m_scanners.size(); i++)
385  amount += (int64_t)m_scanners[i]->get_disk_read();
386  return amount;
387 }
388 
389 
391  ScannerState sstate;
392 
393  assert(!m_initialized);
394 
395  while (!m_queue.empty())
396  m_queue.pop();
397 
398  for (size_t i=0; i<m_scanners.size(); i++) {
399  if (m_scanners[i]->get(sstate.key, sstate.value)) {
400  sstate.scanner = m_scanners[i].get();
401  m_queue.push(sstate);
402  }
403  }
404 
405  bool counter;
406  int64_t cell_cutoff, cur_bytes = 0;
407  const uint8_t *value;
408  size_t value_len;
409 
410  while (!m_queue.empty()) {
411  sstate = m_queue.top();
412 
413  CellPredicate &cp =
415 
416  // update I/O tracking
417  cur_bytes = sstate.key.length + sstate.value.length();
418  io_add_input_cell(cur_bytes);
419 
420  // Only need to worry about counters if this scanner scans over a
421  // single access group since no counter will span multiple access grps
422  cell_cutoff = m_scan_context->cell_predicates[
423  sstate.key.column_family_code].cutoff_time;
424  counter = m_accumulate_counters &&
426 
427  if (sstate.key.timestamp < cell_cutoff
428  || (sstate.key.timestamp < m_start_timestamp)) {
429  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
430  purge_from_index(sstate.key, sstate.value);
431  m_queue.pop();
432  sstate.scanner->forward();
433  if (sstate.scanner->get(sstate.key, sstate.value))
434  m_queue.push(sstate);
435  continue;
436  }
437  else if (sstate.key.flag == FLAG_DELETE_ROW) {
438  update_deleted_row(sstate.key);
439  if (!m_return_deletes) {
440  forward();
441  m_initialized = true;
442  return;
443  }
444  }
445  else if (sstate.key.flag == FLAG_DELETE_COLUMN_FAMILY) {
447  if (!m_return_deletes) {
448  forward();
449  m_initialized = true;
450  return;
451  }
452  }
453  else if (sstate.key.flag == FLAG_DELETE_CELL) {
454  update_deleted_cell(sstate.key);
455  if (!m_return_deletes) {
456  forward();
457  m_initialized = true;
458  return;
459  }
460  }
461  else if (sstate.key.flag == FLAG_DELETE_CELL_VERSION) {
463  if (!m_return_deletes) {
464  forward();
465  m_initialized = true;
466  return;
467  }
468  }
469  else if (sstate.key.flag == FLAG_INSERT) {
470  if (sstate.key.revision > m_revision
471  || (sstate.key.timestamp >= m_end_timestamp
472  && (!m_return_deletes || sstate.key.flag == FLAG_INSERT))) {
473  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
474  purge_from_index(sstate.key, sstate.value);
475  m_queue.pop();
476  sstate.scanner->forward();
477  if (sstate.scanner->get(sstate.key, sstate.value))
478  m_queue.push(sstate);
479  continue;
480  }
481 
482  // keep track of revisions
483  const uint8_t *latest_key = (const uint8_t *)sstate.key.row;
484  size_t latest_key_len = sstate.key.flag_ptr -
485  (const uint8_t *)sstate.key.row + 1;
486 
487  if (m_prev_key.fill()==0) {
488  m_prev_key.set(latest_key, latest_key_len);
490  m_revs_count=0;
492  }
493  else if (m_prev_key.fill() != latest_key_len ||
494  memcmp(latest_key, m_prev_key.base, latest_key_len)) {
495  m_prev_key.set(latest_key, latest_key_len);
497  m_revs_count=0;
499  }
500  m_revs_count++;
501  if (m_revs_limit && m_revs_count > m_revs_limit && !counter) {
502  if (m_index_updater && sstate.key.flag == FLAG_INSERT)
503  purge_from_index(sstate.key, sstate.value);
504  m_queue.pop();
505  sstate.scanner->forward();
506  if (sstate.scanner->get(sstate.key, sstate.value))
507  m_queue.push(sstate);
508  continue;
509  }
510 
511  // row set
512  if (!m_scan_context->rowset.empty()) {
513  int cmp = 1;
514  while (!m_scan_context->rowset.empty()
515  && (cmp = strcmp(*m_scan_context->rowset.begin(), sstate.key.row)) < 0)
516  m_scan_context->rowset.erase(m_scan_context->rowset.begin());
517  if (cmp > 0) {
518  m_queue.pop();
519  sstate.scanner->forward();
520  if (sstate.scanner->get(sstate.key, sstate.value))
521  m_queue.push(sstate);
522  continue;
523  }
524  }
525  // value match (exact match or prefix match)
526  value_len = sstate.value.decode_length(&value);
527  if (!cp.matches(sstate.key.column_qualifier,
528  (size_t)sstate.key.column_qualifier_len,
529  (const char *)value, value_len)) {
530  m_queue.pop();
531  sstate.scanner->forward();
532  if (sstate.scanner->get(sstate.key, sstate.value))
533  m_queue.push(sstate);
534  continue;
535  }
536  // row regexp
538  if (!RE2::PartialMatch(sstate.key.row,
539  *(m_scan_context->row_regexp))) {
540  m_queue.pop();
541  sstate.scanner->forward();
542  if (sstate.scanner->get(sstate.key, sstate.value))
543  m_queue.push(sstate);
544  continue;
545  }
546  // filter by value regexp last since its probly the most expensive
547  if (m_scan_context->value_regexp && !counter) {
548  value_len = sstate.value.decode_length(&value);
549  if (!RE2::PartialMatch(re2::StringPiece((const char *)value, value_len),
551  m_queue.pop();
552  sstate.scanner->forward();
553  if (sstate.scanner->get(sstate.key, sstate.value))
554  m_queue.push(sstate);
555  continue;
556  }
557  }
558 
559  m_delete_present = false;
560  m_prev_key.set(sstate.key.row, sstate.key.flag_ptr
561  - (const uint8_t *)sstate.key.row + 1);
564 
565  // if counter then keep incrementing till we are ready with 1st kv pair
566  if (counter) {
567  start_count(sstate.key, sstate.value);
568  forward();
569  m_initialized = true;
570  return;
571  }
572  }
573 
574  break;
575  }
576 
577  io_add_output_cell(cur_bytes);
578 
579  m_initialized = true;
580 }
bool matches_deleted_column_family(const Key &key) const
virtual void forward()=0
uint32_t max_versions
Max versions (0 for all versions)
std::vector< CellListScannerPtr > m_scanners
int64_t timestamp
Definition: Key.h:134
void set_rowkey(const char *rowkey, bool match)
const char * row
Definition: Key.h:129
Declarations for MergeScannerAccessGroup.
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
uint32_t length
Definition: Key.h:124
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
MergeScannerAccessGroup(String &table_name, ScanContext *scan_ctx, uint32_t flags=0)
Constructor.
void start_count(const Key &key, const ByteString &value)
void purge_from_index(const Key &key, const ByteString &value)
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
#define HT_INFO(msg)
Definition: Logger.h:271
Scan context information.
Definition: ScanContext.h:52
bool matches_counted_key(const Key &key) const
bool matches(const char *qualifier, size_t qualifier_len, const char *value, size_t value_len)
Evaluates predicate for the given cell.
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
Declarations for Schema.
bool matches_deleted_row(const Key &key) const
bool matches_deleted_cell_version(const Key &key) const
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
void check_rowkey(const char *rowkey, bool *cached, bool *match)
const char * str() const
Returns a pointer to the String's deserialized data.
Definition: ByteString.h:106
void increment_count(const Key &key, const ByteString &value)
const uint8_t * flag_ptr
Definition: Key.h:133
bool get(Key &key, ByteString &value)
Logging routines and macros.
vector< CellPredicate > cell_predicates
Definition: ScanContext.h:72
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
bool matches_deleted_cell(const Key &key) const
#define HT_ERROR_OUT
Definition: Logger.h:301
pair< int64_t, int64_t > time_interval
Definition: ScanContext.h:70
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
void clear()
Clears the buffer.
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
Provides access to internal components of opaque key.
Definition: Key.h:40
uint32_t column_qualifier_len
Definition: Key.h:132
uint8_t * base
Pointer to the allocated memory buffer.
std::priority_queue< ScannerState, std::vector< ScannerState >, LtScannerState > m_queue
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
void set(const void *data, size_t len)
Overwrites the existing data.
This is a generic exception class for Hypertable.
Definition: Error.h:314
virtual bool get(Key &key, ByteString &value)=0
int64_t revision
Definition: Key.h:135
uint8_t column_family_code
Definition: Key.h:127
static IndexUpdaterPtr create(const String &table_id, SchemaPtr &schema, bool has_index, bool has_qualifier_index)
Factory function.
uint8_t flag
Definition: Key.h:125
const char * column_qualifier
Definition: Key.h:130
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43