0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ScanCells.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 
26 using namespace Hypertable;
27 using namespace Hypertable::Lib;
28 using namespace std;
29 
30 bool ScanCells::add(EventPtr &event, int *scanner_id) {
31  ScanBlockPtr scanblock = std::make_shared<ScanBlock>();
32  scanblock->load(event);
33 
35  m_profile_data += scanblock->profile_data();
36  m_profile_data.scanblocks++;
37 
38  m_scanblocks.push_back(scanblock);
39  *scanner_id = scanblock->get_scanner_id();
40  return scanblock->eos();
41 }
42 
43 bool
44 ScanCells::load(SchemaPtr &schema, const string &end_row, bool end_inclusive,
45  ScanLimitState *limit_state, CstrSet &rowset,
46  int64_t *bytes_scanned, Key *lastkey) {
47  SerializedKey serkey;
48  ByteString value;
49  Key key;
50  Cell cell;
51  ScanBlock *scanblock;
52  ColumnFamilySpec *cf_spec;
53  size_t total_cells=0;
54  bool skipping = lastkey->row != 0;
55 
56  for(size_t ii=0; ii < m_scanblocks.size(); ++ii)
57  total_cells += m_scanblocks[ii]->size();
58 
59  m_cells = make_shared<CellsBuilder>(total_cells);
60 
61  for (size_t ii=0; ii < m_scanblocks.size(); ++ii) {
62  scanblock = m_scanblocks[ii].get();
63  while (scanblock->next(serkey, value)) {
64 
65  if (skipping) {
66  if (serkey <= lastkey->serial)
67  continue;
68  skipping = false;
69  }
70 
71  if (!key.load(serkey))
73 
74  // check for end row
75  if (!strcmp(key.row, Key::END_ROW_MARKER)) {
76  return true;
77  }
78 
79  if (end_inclusive) {
80  if (strcmp(key.row, end_row.c_str()) > 0) {
81  return true;
82  }
83  }
84  else {
85  if (strcmp(key.row, end_row.c_str()) >= 0) {
86  return true;
87  }
88  }
89 
90  // check for row change and row limit
91  if (limit_state->row_limit > 0) {
92  if (strcmp(limit_state->last_row.c_str(), key.row)) {
93  if (!limit_state->last_row.empty() &&
94  limit_state->rows_seen < limit_state->rows_encountered) {
95  limit_state->rows_seen++;
96  if (limit_state->row_limit > 0 &&
97  limit_state->rows_seen >= limit_state->row_limit)
98  return true;
99  }
100  limit_state->rows_encountered++;
101  limit_state->last_row = key.row;
102  }
103  }
104 
105  cell.row_key = key.row;
107  if ((cf_spec = schema->get_column_family(key.column_family_code)) == 0) {
108  if (key.flag != FLAG_DELETE_ROW)
109  HT_THROWF(Error::BAD_KEY, "Unexpected column family code %d",
110  (int)key.column_family_code);
111  cell.column_family = "";
112  }
113  else
114  cell.column_family = cf_spec->get_name().c_str();
115 
116  cell.timestamp = key.timestamp;
117  cell.revision = key.revision;
118  cell.value_len = value.decode_length(&cell.value);
119  cell.flag = key.flag;
120  m_cells->add(cell, false);
121  *bytes_scanned += key.length + cell.value_len;
122 
123  // if rowset scan remove scanned row
124  while (!rowset.empty() && strcmp(*rowset.begin(), key.row) < 0)
125  rowset.erase(rowset.begin());
126 
127  // Check for cell limit
128  limit_state->cells_seen++;
129  if (limit_state->cell_limit > 0 &&
130  limit_state->cells_seen >= limit_state->cell_limit)
131  return true;
132  }
133 
134  // If at end of scan make sure last row encountered is reflected in rows seen
135  if (scanblock->eos() &&
136  limit_state->rows_encountered > limit_state->rows_seen) {
137  limit_state->rows_seen++;
138  HT_ASSERT(limit_state->rows_encountered == limit_state->rows_seen);
139  }
140 
141  // Check row limit
142  if (limit_state->row_limit > 0 &&
143  limit_state->rows_seen >= limit_state->row_limit)
144  return true;
145  }
146 
147  if (key.row)
148  *lastkey = key;
149 
150  return false;
151 }
152 
153 void ScanCells::add(Cell &cell, bool own) {
154  if (!m_cells)
155  m_cells = make_shared<CellsBuilder>();
156  m_cells->add(cell, own);
157 }
int64_t timestamp
Definition: Key.h:134
const char * row
Definition: Key.h:129
size_t rows_encountered
Number of unique rows seen, only populated if row_limit > 0.
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
uint32_t length
Definition: Key.h:124
Column family specification.
bool eos()
Returns true if this is the final scanblock returned by the scanner.
Definition: ScanBlock.h:85
const char * column_qualifier
Definition: Cell.h:68
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
std::string last_row
Last row processed, only populated if row_limit > 0.
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
std::set< const char *, LtCstr > CstrSet
STL Set managing c-style strings.
Definition: StringExt.h:52
bool load(SchemaPtr &schema, const std::string &end_row, bool end_inclusive, ScanLimitState *limit_state, CstrSet &rowset, int64_t *bytes_scanned, Key *lastkey)
Definition: ScanCells.cc:44
uint64_t revision
Definition: Cell.h:70
Compatibility Macros for C/C++.
const char * row_key
Definition: Cell.h:66
bool load(const SerializedKey &key)
Parses the opaque key and loads the components into the member variables.
Definition: Key.cc:158
bool add(EventPtr &event, int *scanner_id)
Adds key/value pairs from scan result.
Definition: ScanCells.cc:30
std::shared_ptr< ScanBlock > ScanBlockPtr
Smart pointer to ScanBlock.
Definition: ScanBlock.h:130
Hypertable library.
Definition: CellInterval.h:30
Hypertable definitions
Encapsulates a block of scan results.
Definition: ScanBlock.h:50
const std::string & get_name() const
Gets column family name.
bool next(SerializedKey &key, ByteString &value)
Returns the next key/value pair in the scanblock.
Definition: ScanBlock.cc:84
Tracks row and cell limits used to enforce scan limit predicates.
const char * column_family
Definition: Cell.h:67
size_t decode_length(const uint8_t **dptr) const
Retrieves the decoded length and returns a pointer to the string.
Definition: ByteString.h:83
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
uint32_t value_len
Definition: Cell.h:72
int64_t revision
Definition: Key.h:135
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
uint8_t flag
Definition: Cell.h:73
uint8_t flag
Definition: Key.h:125
Encapsulates decomposed key and value.
Definition: Cell.h:32
const char * column_qualifier
Definition: Key.h:130
size_t rows_seen
Number of complete rows seen, only populated if row_limit > 0.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
const uint8_t * value
Definition: Cell.h:71
int64_t timestamp
Definition: Cell.h:69