0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MergeScannerRange.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
30 #include "MergeScannerRange.h"
31 
32 #include <Hypertable/Lib/Key.h>
33 
34 #include <Common/Logger.h>
35 
36 #include <cassert>
37 
38 using namespace Hypertable;
39 using namespace std;
40 
41 MergeScannerRange::MergeScannerRange(const string &table_id, ScanContextPtr &scan_ctx)
42  : m_scan_context(scan_ctx) {
43 
44  if (scan_ctx->spec != 0) {
45  m_cell_limit = scan_ctx->spec->cell_limit;
46  m_row_limit = scan_ctx->spec->row_limit;
47  m_cell_limit_per_family = scan_ctx->spec->cell_limit_per_family;
48  m_row_offset = scan_ctx->spec->row_offset;
49  m_cell_offset = scan_ctx->spec->cell_offset;
50 
51  if (scan_ctx->spec->rebuild_indices) {
52  bool has_index = false;
53  bool has_qualifier_index = false;
54 
55  for (auto cf_spec : scan_ctx->schema->get_column_families()){
56  if (!cf_spec || cf_spec->get_deleted())
57  continue;
58  if (scan_ctx->spec->rebuild_indices.value_index() && cf_spec->get_value_index()) {
59  has_index = true;
60  if (has_qualifier_index)
61  break;
62  }
63  if (scan_ctx->spec->rebuild_indices.qualifier_index() && cf_spec->get_qualifier_index()) {
64  has_qualifier_index = true;
65  if (has_index)
66  break;
67  }
68  }
69  if (has_index || has_qualifier_index)
71  IndexUpdaterFactory::create(table_id, scan_ctx->schema,
72  has_index, has_qualifier_index);
73  }
74  }
75 }
76 
78  try {
79  for (auto scanner : m_scanners)
80  delete scanner;
81  }
82  catch (Hypertable::Exception &e) {
83  HT_ERROR_OUT << "Problem destroying MergeScannerRange : " << e
84  << HT_END;
85  }
86 }
87 
88 
90  int64_t cur_bytes;
91  ScannerState sstate;
92  Key key;
93 
94 forward:
95  // empty queue? return to caller
96  if (m_queue.empty())
97  return;
98  sstate = m_queue.top();
99 
100  // while the queue is not empty: pop the top element, forward it
101  // and then re-insert it back into the queue
102  while (true) {
103  bool new_row = false;
104  bool new_cf = false;
105  bool new_cq = false;
106 
107  m_queue.pop();
108 
109  sstate.scanner->forward();
110  if (sstate.scanner->get(sstate.key, sstate.value))
111  m_queue.push(sstate);
112 
113  // empty queue? return to caller
114  if (m_queue.empty())
115  return;
116 
117  sstate = m_queue.top();
118 
119  // update the I/O tracking
120  cur_bytes = sstate.key.length + sstate.value.length();
121 
122  // If this scan is for rebuilding indexes, update indexes and continue
123  if (m_index_updater) {
124  assert(sstate.key.flag == FLAG_INSERT);
125  m_index_updater->add(sstate.key, sstate.value);
126  continue;
127  }
128 
129  // check if this insert starts a new row, a new column family
130  // or a new column qualifier, and make sure that the limits
131  // are respected
132  //
133  // if the MergeScannerAccessGroup returns deleted keys then they will
134  // be processed below.
135  if (sstate.key.flag == FLAG_INSERT) {
136  const uint8_t *latest_key = (const uint8_t *)sstate.key.row;
137  size_t latest_key_len = (sstate.key.flag_ptr+1) -
138  (const uint8_t *)sstate.key.row;
139 
140  if (m_prev_key.fill()==0) {
141  new_row = new_cf = new_cq = true;
144  m_skip_this_row = true;
145  m_row_skipped++;
146  }
147  else
148  m_skip_this_row = false;
149  m_prev_key.set(latest_key, latest_key_len);
151  m_prev_timestamp = sstate.key.timestamp;
152  }
153  else if (m_prev_key.fill() != latest_key_len ||
154  memcmp(latest_key, m_prev_key.base, latest_key_len)) {
155  new_cq = true;
156 
157  if (strcmp(sstate.key.row, (const char *)m_prev_key.base)) {
158  new_row = true;
159  new_cf = true;
162  m_skip_this_row = true;
163  m_row_skipped++;
164  }
165  else
166  m_skip_this_row = false;
167  }
168  else if (sstate.key.column_family_code != m_prev_cf) {
169  new_cf = true;
171  }
172 
173  m_prev_key.set(latest_key, latest_key_len);
175  m_prev_timestamp = sstate.key.timestamp;
176  }
177  else if (m_prev_timestamp != sstate.key.timestamp)
178  m_prev_timestamp = sstate.key.timestamp;
179  else
180  continue;
181  }
182 
183  bool incr_cf_count = false;
184 
185  // check ROW_OFFSET
186  if (m_skip_this_row)
187  continue;
188 
190  if (new_row) {
191  m_row_count++;
193  m_done = true;
194  break;
195  }
196  else if (m_cell_limit_per_family) {
197  if (!new_cf)
198  incr_cf_count = true;
199  }
200  }
201 
202  if (incr_cf_count) {
205  continue;
206  }
207 
208  break;
209  }
210 
211  if (!m_done) {
212  // check CELL_OFFSET and call function recursively if we need to skip
213  // more cells
215  m_cell_skipped++;
216  goto forward;
217  }
218 
219  if (m_cell_limit)
220  m_cell_count++;
221  }
222 
223  if (m_cell_limit != 0 && m_cell_count > m_cell_limit)
224  m_done = true;
225 
226  if (!m_done) {
227  m_cells_output++;
228  m_bytes_output += cur_bytes;
229  }
230 }
231 
233 
234  if (!m_initialized)
235  initialize();
236 
237  if (m_done)
238  return false;
239 
240  if (!m_queue.empty()) {
241  const ScannerState &sstate = m_queue.top();
242  key = sstate.key;
243  value = sstate.value;
244  return true;
245  }
246 
247  return false;
248 }
249 
250 
252  ScannerState sstate;
253 
254  assert(!m_initialized);
255 
256  while (!m_queue.empty())
257  m_queue.pop();
258 
259  for (size_t i=0; i<m_scanners.size(); i++) {
260  if (m_scanners[i]->get(sstate.key, sstate.value)) {
261  sstate.scanner = m_scanners[i];
262  m_queue.push(sstate);
263  }
264  }
265 
266  if (m_queue.empty())
267  return;
268 
269  sstate = m_queue.top();
270 
271  // update I/O tracking
272  int64_t cur_bytes = sstate.key.length + sstate.value.length();
273 
274  // if a new cell was inserted then store the column family and the
275  // key; this is needed in forward() to figure out if the next row
276  // has a new column family or column qualifier
277  if (sstate.key.flag == FLAG_INSERT) {
278  assert(m_prev_key.fill()==0);
279 
281  m_prev_key.set(sstate.key.row, (sstate.key.flag_ptr+1)
282  - (const uint8_t *)sstate.key.row);
284  m_prev_timestamp = sstate.key.timestamp;
285 
286  if (m_row_offset) {
287  m_skip_this_row = true;
288  m_row_skipped++;
289  }
290 
291  if (m_cell_offset)
292  m_cell_skipped = 1;
293  else if (!m_row_offset && m_cell_limit)
294  m_cell_count = 1;
295 
297  m_row_count = 1;
298  }
299 
300  // If this scan is for rebuilding indexes, update index with first key/value
301  if (m_index_updater) {
302  assert(sstate.key.flag == FLAG_INSERT);
303  m_index_updater->add(sstate.key, sstate.value);
304  }
305 
306  // was OFFSET or CELL_OFFSET or index rebuild specified? then move forward and
307  // skip
309  forward();
310  else {
311  m_cells_output++;
312  m_bytes_output += cur_bytes;
313  }
314 
315  m_initialized = true;
316 }
317 
318 
319 
321  int64_t cells = 0;
322  for (auto scanner : m_scanners)
323  cells += scanner->get_input_cells();
324  return cells;
325 }
326 
328  int64_t bytes = 0;
329  for (auto scanner : m_scanners)
330  bytes += scanner->get_input_bytes();
331  return bytes;
332 }
333 
335  int64_t amount = 0;
336  for (auto scanner : m_scanners)
337  amount += (int64_t)scanner->get_disk_read();
338  return amount;
339 }
int64_t timestamp
Definition: Key.h:134
bool m_initialized
Flag indicating if scan has been initialized.
const char * row
Definition: Key.h:129
Declarations for MergeScannerAccessGroup.
uint32_t length
Definition: Key.h:124
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
STL namespace.
IndexUpdaterPtr m_index_updater
Index updater for rebuild indices scan.
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
std::priority_queue< ScannerState, std::vector< ScannerState >, LtScannerState > m_queue
bool m_done
Flag indicating scan is finished.
const uint8_t * flag_ptr
Definition: Key.h:133
bool get(Key &key, ByteString &value)
bool get(Key &key, ByteString &value)
Logging routines and macros.
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
#define HT_ERROR_OUT
Definition: Logger.h:301
int64_t get_input_bytes()
Returns number of bytes input.
MergeScannerRange(const std::string &table_id, ScanContextPtr &scan_ctx)
Hypertable definitions
std::vector< MergeScannerAccessGroup * > m_scanners
int64_t get_input_cells()
Returns number of cells input.
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
void set(const void *data, size_t len)
Overwrites the existing data.
This is a generic exception class for Hypertable.
Definition: Error.h:314
uint8_t column_family_code
Definition: Key.h:127
static IndexUpdaterPtr create(const String &table_id, SchemaPtr &schema, bool has_index, bool has_qualifier_index)
Factory function.
uint8_t flag
Definition: Key.h:125
virtual ~MergeScannerRange()
Destructor.
Declarations for MergeScannerRange.
std::shared_ptr< ScanContext > ScanContextPtr
Definition: ScanContext.h:169