0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IntervalScannerAsync.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "IntervalScannerAsync.h"
25 
26 #include <Hypertable/Lib/Key.h>
27 #include <Hypertable/Lib/Table.h>
28 
29 #include <Common/Error.h>
30 #include <Common/String.h>
31 
32 #include <cassert>
33 #include <chrono>
34 #include <vector>
35 
36 using namespace Hypertable;
37 using namespace Hypertable::Lib;
38 using namespace std;
39 
40 namespace {
41  enum {
42  ABORTED = 1,
43  RESTART = 2
44  };
45 }
46 
48  Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec,
49  uint32_t timeout_ms, bool current, TableScannerAsync *scanner, int id)
50  : m_table(table), m_range_locator(range_locator),
51  m_loc_cache(range_locator->location_cache()),
52  m_scan_limit_state(scan_spec), m_range_server(comm, timeout_ms), m_eos(false),
53  m_fetch_outstanding(false), m_create_outstanding(false),
54  m_end_inclusive(false), m_timeout_ms(timeout_ms),
55  m_current(current), m_bytes_scanned(0),
56  m_create_handler(app_queue, scanner, id, true),
57  m_fetch_handler(app_queue, scanner, id, false),
58  m_create_timer(timeout_ms), m_fetch_timer(timeout_ms),
59  m_cur_scanner_finished(false), m_cur_scanner_id(0), m_state(0),
60  m_create_event_saved(false), m_invalid_scanner_id_ok(false) {
61 
63 
65  init(scan_spec);
66 }
67 
68 
69 void IntervalScannerAsync::init(const ScanSpec &scan_spec) {
70  const char *start_row, *end_row;
71  bool start_row_inclusive=true;
72 
73  if (!scan_spec.row_intervals.empty() && !scan_spec.cell_intervals.empty())
75  "ROW predicates and CELL predicates can't be combined");
76 
78  m_rowset.clear();
79 
91 
92  for (const auto &cp : scan_spec.column_predicates)
94  cp.column_qualifier, cp.operation, cp.value, cp.value_len);
95 
96  string family;
97  const char *colon;
98  for (size_t i=0; i<scan_spec.columns.size(); i++) {
99  colon = strchr(scan_spec.columns[i], ':');
100  family = colon ? String(scan_spec.columns[i], colon-scan_spec.columns[i]) :
101  String(scan_spec.columns[i]);
102  if (m_schema->get_column_family(family.c_str()) == 0)
104  (String)"Table= " + m_table->get_name() + " , Column family=" + scan_spec.columns[i]);
105  m_scan_spec_builder.add_column(scan_spec.columns[i]);
106  }
107 
108  HT_ASSERT(scan_spec.row_intervals.size() <= 1 || scan_spec.scan_and_filter_rows);
109  if (!scan_spec.row_intervals.empty()) {
110  if (!scan_spec.scan_and_filter_rows) {
111  start_row = (scan_spec.row_intervals[0].start == 0) ? ""
112  : scan_spec.row_intervals[0].start;
113  start_row_inclusive = scan_spec.row_intervals[0].start_inclusive;
114  if (scan_spec.row_intervals[0].end == 0 ||
115  scan_spec.row_intervals[0].end[0] == 0)
116  end_row = Key::END_ROW_MARKER;
117  else
118  end_row = scan_spec.row_intervals[0].end;
119  int cmpval = strcmp(start_row, end_row);
120  if (cmpval > 0)
121  HT_THROW(Error::BAD_SCAN_SPEC, format("start_row (%s) > end_row (%s)", start_row, end_row));
122  if (cmpval == 0 && !scan_spec.row_intervals[0].start_inclusive
123  && !scan_spec.row_intervals[0].end_inclusive)
124  HT_THROW(Error::BAD_SCAN_SPEC, "empty row interval");
125  m_start_row = start_row;
126 
127  m_end_row = end_row;
128  m_end_inclusive = scan_spec.row_intervals[0].end_inclusive;
130  scan_spec.row_intervals[0].start_inclusive, end_row,
131  scan_spec.row_intervals[0].end_inclusive);
132  }
133  else {
135  // order and filter duplicated rows
136  CstrSet rowset;
137  for (const auto &ri : scan_spec.row_intervals)
138  rowset.insert(ri.start); // ri.start always equals to ri.end
139  // setup ordered row intervals and rowset
140  m_scan_spec_builder.reserve_rows(rowset.size());
141  for (auto r : rowset) {
142  // end is set to "" in order to safe space
143  m_scan_spec_builder.add_row_interval(r, true, "", true);
144  // Cstr's must be taken from m_scan_spec_builder and not from scan_spec
145  m_rowset.insert(m_scan_spec_builder.get().row_intervals.back().start);
146  }
147  m_start_row = *m_rowset.begin();
148 
149  m_end_row = *m_rowset.rbegin();
150  m_end_inclusive = true;
151  }
152  }
153  else if (!scan_spec.cell_intervals.empty()) {
154  start_row = (scan_spec.cell_intervals[0].start_row == 0) ? ""
155  : scan_spec.cell_intervals[0].start_row;
156 
157  if (scan_spec.cell_intervals[0].start_column == 0)
159  "Bad cell interval (start_column == NULL)");
160  if (scan_spec.cell_intervals[0].end_row == 0 ||
161  scan_spec.cell_intervals[0].end_row[0] == 0)
162  end_row = Key::END_ROW_MARKER;
163  else
164  end_row = scan_spec.cell_intervals[0].end_row;
165  if (scan_spec.cell_intervals[0].end_column == 0)
167  "Bad cell interval (end_column == NULL)");
168  int cmpval = strcmp(start_row, end_row);
169  if (cmpval > 0)
170  HT_THROW(Error::BAD_SCAN_SPEC, "start_row > end_row");
171  if (cmpval == 0) {
172  int cmpval = strcmp(scan_spec.cell_intervals[0].start_column,
173  scan_spec.cell_intervals[0].end_column);
174  if (cmpval == 0 && !scan_spec.cell_intervals[0].start_inclusive
175  && !scan_spec.cell_intervals[0].end_inclusive)
176  HT_THROW(Error::BAD_SCAN_SPEC, "empty cell interval");
177  }
179  scan_spec.cell_intervals[0].start_column,
180  scan_spec.cell_intervals[0].start_inclusive,
181  end_row, scan_spec.cell_intervals[0].end_column,
182  scan_spec.cell_intervals[0].end_inclusive);
183 
184  m_start_row = start_row;
185  m_end_row = end_row;
186  m_end_inclusive = true;
187  }
188  else {
189  m_start_row = "";
192  }
194  scan_spec.time_interval.second);
195 
198 
199  // If offset or limit specified, defer readahead until outstanding result is
200  // processed
206 
207  // start scan asynchronously (can trigger table not found exceptions)
209  if (!start_row_inclusive)
210  m_create_scanner_row.append(1,1);
213 }
214 
216  try {
217  // destroy dangling scanner
220  }
221  catch(Exception &e) {
222  HT_ERROR_OUT << e << HT_END;
223  }
224 }
225 
226 // caller is responsible for state of m_create_timer
227 void IntervalScannerAsync::find_range_and_start_scan(const char *row_key, bool hard) {
228 
229  RangeSpec range;
230  DynamicBuffer dbuf(0);
231 
233 
234  // if rowset scan adjust row intervals
235  if (!m_rowset.empty()) {
237  while (row_intervals.size() && strcmp(row_intervals.front().start, row_key) < 0)
238  row_intervals.erase(row_intervals.begin());
239  }
240 
241  try_again:
242 
243  try {
244  m_range_locator->find_loop(&m_table_identifier, row_key,
246  }
247  catch (Exception &e) {
248  if (e.code() == Error::REQUEST_TIMEOUT)
249  HT_THROW2(e.code(), e, e.what());
250  this_thread::sleep_for(chrono::milliseconds(1000));
251  if (m_create_timer.expired())
252  HT_THROW(Error::REQUEST_TIMEOUT, e.what());
253  goto try_again;
254  }
255 
256  while (true) {
257  set_range_spec(dbuf, range);
258  try {
260  // create scanner asynchronously
261  m_create_outstanding = true;
266 
267  }
268  catch (Exception &e) {
269  string msg = format("Problem creating scanner at %s on %s[%s..%s] - %s",
271  range.start_row, range.end_row, e.what());
272  reset_outstanding_status(true, false);
273  if (e.code() != Error::REQUEST_TIMEOUT &&
277  HT_ERROR_OUT << e << HT_END;
278  HT_THROW2(e.code(), e, msg);
279  }
280  else if (m_create_timer.remaining() <= 1000) {
281  uint32_t duration = m_create_timer.duration();
282  HT_ERRORF("Scanner creation request will time out. Initial timer "
283  "duration %d (last error = %s - %s)", (int)duration,
284  Error::get_text(e.code()), e.what());
285  HT_THROW2(Error::REQUEST_TIMEOUT, e, msg + format(". Unable to "
286  "complete request within %d ms", (int)duration));
287  }
288 
289  this_thread::sleep_for(chrono::milliseconds(1000));
290 
291  // try again, the hard way
292  m_range_locator->find_loop(&m_table_identifier, row_key,
294  continue;
295  }
296  break;
297  }
298 }
299 
300 void IntervalScannerAsync::reset_outstanding_status(bool is_create, bool reset_timer) {
301  if (is_create) {
303  m_create_outstanding = false;
304  if (reset_timer)
306  }
307  else {
309  m_fetch_outstanding = false;
310  if (reset_timer)
312  }
313 }
314 
315 bool IntervalScannerAsync::abort(bool is_create) {
316  reset_outstanding_status(is_create, true);
317  m_eos = true;
318  m_state = ABORTED;
319  bool move_to_next = m_current && !has_outstanding_requests();
320  if (move_to_next)
321  m_current = false;
322  return move_to_next;
323 }
324 
326  // handle case where row limit was hit and scanner was cancelled but fetch request is
327  // still outstanding
328  reset_outstanding_status(is_create, true);
330  HT_ASSERT(!is_create);
332  m_invalid_scanner_id_ok = false;
333  return true;
334  }
335  return false;
336 }
337 
338 bool IntervalScannerAsync::retry_or_abort(bool refresh, bool hard, bool is_create,
339  bool *move_to_next, int last_error) {
340  uint32_t wait_time = 1000;
341  reset_outstanding_status(is_create, false);
342 
343  if (m_eos) {
344  *move_to_next = !has_outstanding_requests() && m_current;
345  if (*move_to_next)
346  m_current = false;
347  return true;
348  }
349 
350  // RangeServer has already destroyed scanner so we can't refresh
351  if (!is_create && refresh) {
352  HT_ERROR_OUT << "Table schema can't be refreshed when schema changes after scanner creation"
353  << HT_END;
354  // retry failed, abort scanner
355  *move_to_next = !has_outstanding_requests() && m_current;
356  if (*move_to_next)
357  m_current = false;
358  m_eos = true;
359  m_state = ABORTED;
360  return false;
361  }
362 
363  // no point refreshing, since the wait will cause a timeout
364  if (is_create && refresh && m_create_timer.remaining() < wait_time) {
365  uint32_t duration = m_create_timer.duration();
366  HT_ERRORF("Scanner creation request will time out. Initial timer "
367  "duration %d", (int)duration);
368  // retry failed, abort scanner
369  *move_to_next = !has_outstanding_requests() && m_current;
370  if (*move_to_next)
371  m_current = false;
372  m_eos = true;
373  m_state = ABORTED;
374  return false;
375  }
376 
377  if (last_error == Error::COMM_NOT_CONNECTED ||
378  last_error == Error::COMM_BROKEN_CONNECTION ||
379  last_error == Error::COMM_INVALID_PROXY)
380  m_state = RESTART;
381 
382  if (m_state == RESTART) {
384  restart_scan(refresh);
385  *move_to_next = (m_state==ABORTED) &&
387  return m_state != ABORTED;
388  }
389 
390  try {
391  if (refresh)
393  // wait a bit before kicking off the scan again
394  // this_thread::sleep_for(chrono::milliseconds(wait_time));
396  }
397  catch (Exception &e) {
398  HT_ERROR_OUT << e << HT_END;
399  if (m_create_outstanding) {
400  reset_outstanding_status(is_create, false);
401  }
402  // retry failed, abort scanner
403  *move_to_next = !has_outstanding_requests() && m_current;
404  if (*move_to_next)
405  m_current = false;
406  m_eos = true;
407  m_state = ABORTED;
408  return false;
409  }
410 
412  *move_to_next = !has_outstanding_requests() && m_current;
413  if (*move_to_next)
414  m_current = false;
415  return true;
416 }
417 
419  dbuf.ensure(m_next_range_info.start_row.length()
420  + m_next_range_info.end_row.length() + 2);
421  range.start_row = (char *)dbuf.add_unchecked(m_next_range_info.start_row.c_str(),
422  m_next_range_info.start_row.length()+1);
423  range.end_row = (char *)dbuf.add_unchecked(m_next_range_info.end_row.c_str(),
424  m_next_range_info.end_row.length()+1);
425 }
426 
427 bool IntervalScannerAsync::handle_result(bool *show_results, ScanCellsPtr &cells,
428  EventPtr &event, bool is_create) {
429 
430  reset_outstanding_status(is_create, true);
431 
432  // deal with outstanding fetch/create for aborted scanner
433  if (m_eos) {
434  if (m_state == ABORTED)
435  // scan was aborted caller shd have shown error on first occurrence
436  *show_results = false;
437  else {
438  // scan is over but there was a create/fetch outstanding, send a ScanCells with 0 cells
439  // and just the eos bit set
440  *show_results = true;
441  cells = make_shared<ScanCells>();
442  }
443  return !has_outstanding_requests();
444  }
445 
446  *show_results = m_current;
447  // if this event is from a fetch scanblock
448  if (!is_create) {
449  set_result(event, cells, is_create);
450  load_result(cells);
451  if (!has_outstanding_requests()) {
452  if (m_state == RESTART)
453  restart_scan();
454  else if (m_eos)
455  m_current = false;
456  }
457  }
458  // else this event is from a create scanner result
459  else {
460  // if this scanner is current
461  if (m_current) {
462  // if there is a fetch that is outstanding
463  if (m_fetch_outstanding) {
464  *show_results = false;
465  // save this event for now
466  m_create_event = event;
467  m_create_event_saved = true;
468  }
469  // got results from create_scanner and theres no outstanding fetch request
470  else {
471  // set the range_info and load cells
472  if (m_state != RESTART) {
474  set_result(event, cells, is_create);
475  load_result(cells);
476  }
477  else {
478  // Send back an empty ScanCells object
479  cells = make_shared<ScanCells>();
480  }
481  if (!has_outstanding_requests()) {
482  if (m_state == RESTART)
483  restart_scan();
484  else if (m_eos)
485  m_current = false;
486  }
487  }
488  }
489  else {
490  // this scanner is not current, just enqueue for now
492  m_create_event = event;
493  m_create_event_saved = true;
494  }
495  }
497  return (m_eos && !has_outstanding_requests());
498 }
499 
501  bool is_create) {
502  cells = make_shared<ScanCells>();
503  m_cur_scanner_finished = cells->add(event, &m_cur_scanner_id);
504 
505  if (is_create)
507 
508  // if there was an OFFSET (or CELL_OFFSET) predicate in the query and the
509  // RangeServer actually skipped rows (or cells) because of this predicate
510  // then adjust the ScanSpec for the next scanner.
511  int skipped_rows = 0;
512  int skipped_cells = 0;
513  if (is_create) {
514  skipped_rows = cells->get_skipped_rows();
515  skipped_cells = cells->get_skipped_cells();
516  if (skipped_rows) {
517  HT_ASSERT(skipped_cells == 0);
518  HT_ASSERT(m_scan_spec_builder.get().row_offset >= skipped_rows);
519  m_scan_spec_builder.get().row_offset -= skipped_rows;
520  }
521  if (skipped_cells) {
522  HT_ASSERT(skipped_rows == 0);
523  HT_ASSERT(m_scan_spec_builder.get().cell_offset >= skipped_cells);
524  m_scan_spec_builder.get().cell_offset -= skipped_cells;
525  }
526  }
527 
528  // current scanner is finished but we have results saved from the next scanner
529  if (m_cur_scanner_finished && m_create_event_saved) {
530  HT_ASSERT(skipped_rows == 0 && skipped_cells == 0);
532  m_create_event_saved = false;
534  m_cur_scanner_finished = cells->add(m_create_event, &m_cur_scanner_id);
536  }
537 
538  m_profile_data += cells->profile_data();
539 }
540 
542  Key last_key;
543 
544  // if scan is not over, current scanner is finished and next create scanner results
545  // arrived then add them to cells
546  bool eos;
547 
548  if (m_state != RESTART && !m_defer_readahead)
549  readahead();
550 
551  if (m_last_key.row)
552  last_key = m_last_key;
553 
555  m_rowset, &m_bytes_scanned, &last_key);
556 
557  m_eos = m_eos || eos;
558 
559  // if scan is over but current scanner is not finished then destroy it
560  if (m_eos) {
561  if (!m_cur_scanner_finished) {
562  try {
564  }
565  catch (Exception &e) {
566  HT_ERROR_OUT << e << HT_END;
567  }
569  m_cur_scanner_id = 0;
570  }
571  }
572  else {
573  // Record the last key seen in case we need to restart
574  if (last_key.row && last_key.serial.ptr != m_last_key.serial.ptr) {
576  m_last_key_buf.add(last_key.serial.ptr, last_key.length);
578  }
579  }
580 
581  if (!m_eos && m_state != RESTART && m_defer_readahead)
582  readahead();
583 
584  return;
585 }
586 
587 bool IntervalScannerAsync::set_current(bool *show_results, ScanCellsPtr &cells, bool abort) {
588 
590  m_current = true;
591  *show_results = false;
593  return false;
594 
595  if (abort) {
596  m_eos = true;
597  }
598  if (m_eos) {
599  m_current = false;
600  *show_results = false;
602  return true;
603  }
604  *show_results = true;
606  m_create_event_saved = false;
608  set_result(m_create_event, cells);
609  HT_ASSERT(m_state != RESTART);
610  load_result(cells);
611  if (!has_outstanding_requests()) {
612  if (m_state == RESTART)
613  restart_scan();
614  else if (m_eos)
615  m_current = false;
616  }
618  return m_eos && !(has_outstanding_requests());
619 }
620 
622  if (m_eos)
623  return;
624 
625  // if the current scanner is not finished
626  if (!m_cur_scanner_finished) {
628  // request next scanblock and block
629  try {
631  m_fetch_outstanding = true;
634  }
635  catch (Exception &e) {
636  m_fetch_outstanding = false;
638  if (e.code() == Error::COMM_NOT_CONNECTED ||
641  HT_ASSERT(m_state == 0);
642  m_state = RESTART;
643  return;
644  }
645  HT_THROW2F(e.code(), e, "Problem calling RangeServer::fetch_scanblock(%s, sid=%d)",
646  m_range_info.addr.proxy.c_str(), (int)m_cur_scanner_id);
647  }
648 
649  if (m_defer_readahead)
650  return;
651  }
652  // if the current scanner is finished
653  else {
654  // if this range ends with END_ROW_MARKER OR the end row of scan is beyond the range end
655  if (!strcmp(m_range_info.end_row.c_str(), Key::END_ROW_MARKER) ||
656  m_end_row.compare(m_range_info.end_row) <= 0) {
657  // this interval scanner is finished
658  m_eos = true;
659  }
660  }
661  // if there is no create outstanding or saved, scan is not over,
662  // and the range being scanned by the current scanner is not the last range in the scan
663  // then create a scanner for the next range
665  m_end_row.compare(m_range_info.end_row) > 0 &&
666  strcmp(m_range_info.end_row.c_str(), Key::END_ROW_MARKER)) {
668  // if rowset scan update start row for next range
669  if (!m_rowset.empty())
670  if (m_create_scanner_row.compare(*m_rowset.begin()) < 0)
671  m_create_scanner_row = *m_rowset.begin();
672 
673  m_create_scanner_row.append(1,1); // construct row key in next range
674 
676  }
677  return;
678 }
679 
680 
682 
683  HT_ASSERT(m_state == RESTART);
684 
685  try {
686  if (refresh)
688 
689  m_create_event_saved = false;
690  m_create_event = 0;
691 
692  if (m_last_key.row)
694 
695  m_state = 0;
697  }
698  catch (Exception &e) {
699  HT_ERROR_OUT << e << HT_END;
700  m_eos = true;
701  m_state = ABORTED;
702  }
703 }
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
const char * row_regexp
Definition: ScanSpec.h:279
void restart_scan(bool refresh=false)
void set_row_offset(int32_t n)
Sets the number of rows to be skipped at the beginning of the query.
Definition: ScanSpec.h:363
bool retry_or_abort(bool refresh, bool hard, bool is_create, bool *move_to_next, int last_error)
void create_scanner(const CommAddress &addr, const TableIdentifier &table, const RangeSpec &range, const ScanSpec &scan_spec, DispatchHandler *handler)
Issues a "create scanner" request asynchronously.
Definition: Client.cc:217
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
const char * row
Definition: Key.h:129
Range specification.
Definition: RangeSpec.h:40
String proxy
Proxy name.
Definition: CommAddress.h:175
void clear()
Clears the state.
Definition: ScanSpec.h:555
void load_result(ScanCellsPtr &cells)
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
ColumnPredicates column_predicates
Definition: ScanSpec.h:277
uint32_t length
Definition: Key.h:124
int32_t subscanners
Number of RangeServer::create_scanner() calls.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
Asynchronous table scanner.
ProfileDataScanner m_profile_data
Accumulated profile data.
pair< int64_t, int64_t > time_interval
Definition: ScanSpec.h:278
const char * value_regexp
Definition: ScanSpec.h:280
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
TableIdentifierManaged m_table_identifier
void set_range_spec(DynamicBuffer &dbuf, RangeSpec &range)
STL namespace.
void get(TableIdentifierManaged &table_identifier, SchemaPtr &schema)
Get a copy of table identifier and schema atomically.
Definition: Table.cc:140
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
void set_result(EventPtr &event, ScanCellsPtr &cells, bool is_create=false)
bool expired()
Returns true if the timer is expired.
Definition: Timer.h:112
void set_value_regexp(const char *regexp)
Sets the regexp to filter cell values by.
Definition: ScanSpec.h:401
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
void reset(bool start_timer=false)
Resets the timer.
Definition: Timer.h:89
#define HT_ASSERT(_e_)
Definition: Logger.h:396
vector< RowInterval, RowIntervalAlloc > RowIntervals
Definition: ScanSpec.h:44
Scan predicate and control specification.
Definition: ScanSpec.h:56
void reset_outstanding_status(bool is_create, bool reset_timer)
Represents an open table.
Definition: Table.h:58
void set_cell_limit(int32_t n)
Sets the maximum number of cells to return.
Definition: ScanSpec.h:349
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
Definition: StringExt.h:52
bool set_current(bool *show_results, ScanCellsPtr &cells, bool abort)
const char * end_row
Definition: RangeSpec.h:60
void fetch_scanblock(const CommAddress &addr, int32_t scanner_id, DispatchHandler *handler)
Issues a "fetch scanblock" request asynchronously.
Definition: Client.cc:344
Lib::RangeServer::Client m_range_server
std::set< std::string > servers
Set of server proxy names participating in scan.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void set_row_regexp(const char *regexp)
Sets the regexp to filter rows by.
Definition: ScanSpec.h:394
void set_keys_only(bool val)
Return only keys (no values)
Definition: ScanSpec.h:514
uint8_t * add(const void *data, size_t len)
Adds more data WITH boundary checks; if required the buffer is resized and existing data is preserved...
TableParts rebuild_indices
Definition: ScanSpec.h:286
uint32_t duration()
Returns the duration of the timer.
Definition: Timer.h:127
Compatibility Macros for C/C++.
void set_return_deletes(bool val)
Internal use only.
Definition: ScanSpec.h:521
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
void destroy_scanner(const CommAddress &addr, int32_t scanner_id, DispatchHandler *handler)
Issues a "destroy scanner" request asynchronously.
Definition: Client.cc:290
void set_cell_limit_per_family(int32_t n)
Sets the maximum number of cells to return per column family.
Definition: ScanSpec.h:356
void set_do_not_cache(bool val)
Don't cache.
Definition: ScanSpec.h:535
void set_default_timeout(int32_t timeout_ms)
Sets the default client connection timeout.
Definition: Client.h:72
#define HT_ERROR_OUT
Definition: Logger.h:301
String to_str() const
Returns string representation of address.
Definition: CommAddress.cc:34
Hypertable library.
Definition: CellInterval.h:30
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
SerializedKey serial
Definition: Key.h:123
void set_time_interval(int64_t start, int64_t end)
Sets the time interval of the scan.
Definition: ScanSpec.h:499
void add_column(const string &str)
Adds a column family to be returned by the scan.
Definition: ScanSpec.h:408
void add_row_interval(const string &start, bool start_inclusive, const string &end, bool end_inclusive)
Adds a row interval to be returned in the scan.
Definition: ScanSpec.h:455
Entry point to AsyncComm service.
Definition: Comm.h:61
const char * start_row
Definition: RangeSpec.h:59
void set_rebuild_indices(TableParts parts)
Rebuild indices.
Definition: ScanSpec.h:541
void clear()
Clears the buffer.
void start()
Starts the timer.
Definition: Timer.h:64
IntervalScannerAsync(Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, const ScanSpec &scan_spec, uint32_t timeout_ms, bool current, TableScannerAsync *scanner, int id)
Constructs a IntervalScannerAsync object.
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
RowIntervals row_intervals
Definition: ScanSpec.h:275
TableScannerDispatchHandler m_fetch_handler
This is a generic exception class for Hypertable.
Definition: Error.h:314
A String class based on std::string.
TableScannerDispatchHandler m_create_handler
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
const std::string & get_name()
Definition: Table.h:139
void add_cell_interval(const string &start_row, const string &start_column, bool start_inclusive, const string &end_row, const string &end_column, bool end_inclusive)
Adds a cell interval to be returned in the scan.
Definition: ScanSpec.h:485
void find_range_and_start_scan(const char *row_key, bool hard=false)
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan.
Definition: ScanSpec.h:426
CellIntervals cell_intervals
Definition: ScanSpec.h:276
void set_row_limit(int32_t n)
Sets the maximum number of rows to return in the scan.
Definition: ScanSpec.h:342
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
Definition: CommAddress.h:147
void set_max_versions(uint32_t n)
Sets the maximum number of revisions of each cell to return in the scan.
Definition: ScanSpec.h:387
bool handle_result(bool *show_results, ScanCellsPtr &cells, EventPtr &event, bool is_create)
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
void set_scan_and_filter_rows(bool val)
Scan and filter rows.
Definition: ScanSpec.h:528
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
void set_cell_offset(int32_t n)
Sets the number of cells to be skipped at the beginning of the query.
Definition: ScanSpec.h:375
void refresh()
Refresh schema etc.
Definition: Table.cc:132
int code() const
Returns the error code.
Definition: Error.h:391
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484