0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MergeScannerAccessGroup.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #ifndef Hypertable_RangeServer_MergeScannerAccessGroup_h
28 #define Hypertable_RangeServer_MergeScannerAccessGroup_h
29 
30 #include "CellListScanner.h"
32 #include "IndexUpdater.h"
33 #include "ScanContext.h"
34 
35 #include <Common/ByteString.h>
36 #include <Common/DynamicBuffer.h>
37 
38 #include <memory>
39 #include <queue>
40 #include <string>
41 #include <vector>
42 #include <set>
43 
44 namespace Hypertable {
45 
48 
51 
52  class RegexpInfo {
53  public:
55  : last_family(-1), last_rowkey_match(false), last_column_match(false) { }
56 
57  void check_rowkey(const char *rowkey, bool *cached, bool *match) {
58  *match = last_rowkey_match;
59  if (!strcmp(rowkey, last_rowkey.c_str()))
60  *cached = true;
61  else
62  *cached = false;
63  }
64 
65  void check_column(int family, const char *qualifier, bool *cached,
66  bool *match) {
67  *match = last_column_match;
68  if (last_family == family
69  && !strcmp(qualifier, last_qualifier.c_str()))
70  *cached = true;
71  else
72  *cached = false;
73  }
74 
75  void set_rowkey(const char *rowkey, bool match) {
76  last_rowkey = rowkey;
77  last_rowkey_match = match;
78  }
79 
80  void set_column(int family, const char *qualifier, bool match) {
81  last_family = family;
82  last_qualifier = qualifier;
83  last_column_match = match;
84  }
85 
86  private:
92  };
93 
94  struct ScannerState {
98  };
99 
100  struct LtScannerState {
101  bool operator()(const ScannerState &ss1, const ScannerState &ss2) const {
102  return ss1.key.serial > ss2.key.serial;
103  }
104  };
105 
106  public:
107 
108  enum Flags {
109  RETURN_DELETES = 0x00000001,
110  IS_COMPACTION = 0x00000002,
111  ACCUMULATE_COUNTERS = 0x00000004
112  };
113 
118  MergeScannerAccessGroup(String &table_name, ScanContext *scan_ctx,
119  uint32_t flags=0);
120 
123  virtual ~MergeScannerAccessGroup();
124 
126  m_scanners.push_back(scanner);
127  }
128 
129  void forward();
130 
131  bool get(Key &key, ByteString &value);
132 
133  uint32_t get_flags() { return m_flags; }
134 
136  m_release_callback = cb;
137  }
138 
139  void io_add_input_cell(int64_t cur_bytes) {
140  m_bytes_input += cur_bytes;
141  m_cells_input++;
142  }
143 
144  void io_add_output_cell(int64_t cur_bytes) {
145  m_bytes_output += cur_bytes;
146  m_cells_output++;
147  }
148 
149  int64_t get_input_cells() { return m_cells_input; }
150  int64_t get_output_cells() { return m_cells_output; }
151 
152  int64_t get_input_bytes() { return m_bytes_input; }
153  int64_t get_output_bytes() { return m_bytes_output; }
154 
155  void add_disk_read(int64_t amount) { m_disk_read += amount; }
156  int64_t get_disk_read();
157 
158  private:
159 
160  void initialize();
161 
162  inline bool matches_deleted_row(const Key& key) const {
163  size_t len = key.len_row();
164 
165  HT_DEBUG_OUT <<"filtering deleted row '"
166  << String((char*)m_deleted_row.base, m_deleted_row.fill()) <<"' vs '"
167  << String(key.row, len) <<"'" << HT_END;
168 
169  return (m_delete_present && m_deleted_row.fill() > 0
170  && m_deleted_row.fill() == len
171  && !memcmp(m_deleted_row.base, key.row, len));
172  }
173 
174  inline void update_deleted_row(const Key& key) {
175  size_t len = key.len_row();
177  m_deleted_row.ensure(len);
178  memcpy(m_deleted_row.base, key.row, len);
181  m_delete_present = true;
182  }
183 
184  inline bool matches_deleted_column_family(const Key& key) const {
185  size_t len = key.len_column_family();
186 
187  HT_DEBUG_OUT <<"filtering deleted row-column-family '"
190  <<"' vs '"<< String(key.row, len) <<"'" << HT_END;
191 
193  && m_deleted_column_family.fill() == len
194  && !memcmp(m_deleted_column_family.base, key.row, len));
195  }
196 
197  inline void update_deleted_column_family(const Key &key) {
198  size_t len = key.len_column_family();
201  memcpy(m_deleted_column_family.base, key.row, len);
204  m_delete_present = true;
205  }
206 
207  inline bool matches_deleted_cell(const Key& key) const {
208  size_t len = key.len_cell();
209 
210  HT_DEBUG_OUT <<"filtering deleted cell '"
212  <<"' vs '"<< String(key.row, len) <<"'" << HT_END;
213 
214  return (m_delete_present && m_deleted_cell.fill() > 0
215  && m_deleted_cell.fill() == len
216  && !memcmp(m_deleted_cell.base, key.row, len));
217  }
218 
219  inline void update_deleted_cell(const Key &key) {
220  size_t len = key.len_cell();
222  m_deleted_cell.ensure(len);
223  memcpy(m_deleted_cell.base, key.row, len);
226  m_delete_present = true;
227  }
228 
229  inline bool matches_deleted_cell_version(const Key& key) const {
230  size_t len = key.len_cell();
231 
232  HT_DEBUG_OUT <<"filtering deleted cell version'"
235  <<"' vs '"<< String(key.row, len) <<"'" << HT_END;
236 
238  && m_deleted_cell_version.fill() == len
239  && !memcmp(m_deleted_cell_version.base, key.row, len));
240  }
241 
242  inline void update_deleted_cell_version(const Key &key) {
243  size_t len = key.len_cell();
246  memcpy(m_deleted_cell_version.base, key.row, len);
249  m_delete_present = true;
250  }
251 
252  inline bool matches_counted_key(const Key& key) const {
253  size_t len = key.len_cell();
254  size_t len_counted_key = m_counted_key.len_cell();
255 
256  return (m_count_present && len == len_counted_key &&
257  !memcmp(m_counted_key.row, key.row, len));
258  }
259 
260  inline void increment_count(const Key &key, const ByteString &value) {
262  return;
263  const uint8_t *decode;
264  size_t remain = value.decode_length(&decode);
265  // value must be encoded 64 bit int
266  if (remain != 8 && remain != 9) {
267  HT_FATAL_OUT << "Expected counter to be encoded 64 bit int but "
268  "remain=" << remain << " ,key=" << key << HT_END;
269  }
270  m_count += Serialization::decode_i64(&decode, &remain);
271  if (remain == 1) {
272  if ((char)*decode != '=')
273  HT_FATAL_OUT << "Bad counter reset flag, expected '=' but "
274  "got " << (int)*decode << HT_END;
276  }
277  }
278 
279  inline void finish_count() {
280  uint8_t *ptr = m_counted_value.base;
281 
282  *ptr++ = 9; // length
284  *ptr++ = '=';
285 
288  m_no_forward = true;
289  m_count_present = false;
290  m_skip_remaining_counter = false;
291  }
292 
293  inline void start_count(const Key &key, const ByteString &value) {
294  SerializedKey serial;
295 
296  m_count_present = true;
297  m_count = 0;
298 
301  memcpy(m_counted_key_buffer.base, key.serial.ptr, key.length);
302  serial.ptr = m_counted_key_buffer.base;
303 
304  m_counted_key.load(serial);
305  increment_count(key, value);
306  }
307 
308  inline void purge_from_index(const Key &key, const ByteString &value) {
309  HT_ASSERT(key.flag == FLAG_INSERT);
311  m_index_updater->purge(key, value);
312  }
313 
315 
316  uint32_t m_flags {};
317  bool m_done {};
318  bool m_initialized {};
319 
320  std::vector<CellListScannerPtr> m_scanners;
321  std::priority_queue<ScannerState, std::vector<ScannerState>,
323 
324 
325  int64_t m_bytes_input {};
326  int64_t m_bytes_output {};
327  int64_t m_cells_input {};
328  int64_t m_cells_output {};
329  int64_t m_disk_read {};
330 
331  // if this is true, return a delete even if it doesn't satisfy
332  // the ScanSpec timestamp/version requirement
335 
336  // to avoid performance problems, the revision limit is checked in this
337  // class and not in MergeScannerRange. otherwise this class would send
338  // ALL revisions to the upper level, even if just one would be required.
339  uint32_t m_revs_count {};
340  uint32_t m_revs_limit {};
341 
343  int32_t m_prev_cf;
344  bool m_no_forward {};
348  uint64_t m_count;
353  int64_t m_revision;
363  std::set<int64_t> m_deleted_cell_version_set;
365 
367 
368  };
369 
371  typedef std::shared_ptr<MergeScannerAccessGroup> MergeScannerAccessGroupPtr;
372 
374 
375 } // namespace Hypertable
376 
377 #endif // Hypertable_RangeServer_MergeScannerAccessGroup_h
bool matches_deleted_column_family(const Key &key) const
std::vector< CellListScannerPtr > m_scanners
int64_t timestamp
Definition: Key.h:134
void set_rowkey(const char *rowkey, bool match)
const char * row
Definition: Key.h:129
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
uint32_t length
Definition: Key.h:124
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
std::shared_ptr< IndexUpdater > IndexUpdaterPtr
Smart pointer to IndexUpdater.
Definition: IndexUpdater.h:87
MergeScannerAccessGroup(String &table_name, ScanContext *scan_ctx, uint32_t flags=0)
Constructor.
void start_count(const Key &key, const ByteString &value)
void install_release_callback(CellStoreReleaseCallback &cb)
void purge_from_index(const Key &key, const ByteString &value)
Scan context information.
Definition: ScanContext.h:52
bool matches_counted_key(const Key &key) const
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
void set_column(int family, const char *qualifier, bool match)
bool matches_deleted_row(const Key &key) const
bool matches_deleted_cell_version(const Key &key) const
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void check_rowkey(const char *rowkey, bool *cached, bool *match)
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
void increment_count(const Key &key, const ByteString &value)
A dynamic, resizable memory buffer.
std::shared_ptr< MergeScannerAccessGroup > MergeScannerAccessGroupPtr
Shared pointer to MergeScannerAccessGroup.
Declarations for ScanContext.
vector< CellPredicate > cell_predicates
Definition: ScanContext.h:72
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
#define HT_END
Definition: Logger.h:220
bool matches_deleted_cell(const Key &key) const
Declarations for IndexUpdater.
void add_scanner(CellListScannerPtr scanner)
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
SerializedKey serial
Definition: Key.h:123
void clear()
Clears the buffer.
size_t len_column_family() const
Definition: Key.h:115
bool operator()(const ScannerState &ss1, const ScannerState &ss2) const
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
std::priority_queue< ScannerState, std::vector< ScannerState >, LtScannerState > m_queue
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
void set(const void *data, size_t len)
Overwrites the existing data.
A serializable ByteString.
uint8_t column_family_code
Definition: Key.h:127
std::shared_ptr< CellListScanner > CellListScannerPtr
Definition: CellList.h:35
uint8_t flag
Definition: Key.h:125
void check_column(int family, const char *qualifier, bool *cached, bool *match)
size_t len_row() const
Definition: Key.h:113
size_t len_cell() const
Definition: Key.h:119
#define HT_FATAL_OUT
Definition: Logger.h:347
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
#define HT_DEBUG_OUT
Definition: Logger.h:261
Merge scanner for access groups.