0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CellStoreScannerIntervalBlockIndex.cc
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
27 
28 #include <Common/Compat.h>
29 
31 
34 
36 
38 #include <AsyncComm/Event.h>
39 #include <AsyncComm/Protocol.h>
40 
41 #include <Common/Error.h>
42 #include <Common/System.h>
43 
44 #include <cassert>
45 #include <utility>
46 
47 using namespace Hypertable;
48 
49 template <typename IndexT>
51  IndexT *index, SerializedKey start_key, SerializedKey end_key, ScanContext *scan_ctx) :
52  m_cellstore(cellstore), m_index(index), m_start_key(start_key),
53  m_end_key(end_key), m_scan_ctx(scan_ctx), m_rowset(scan_ctx->rowset) {
54 
55  memset(&m_block, 0, sizeof(m_block));
56  m_file_id = m_cellstore->get_file_id();
57  m_zcodec = m_cellstore->create_block_compression_codec();
58  m_key_decompressor = m_cellstore->create_key_decompressor();
59 
61  m_fd = m_cellstore->get_fd();
62 
63  if (m_start_key && (m_iter = m_index->lower_bound(m_start_key)) == m_index->end())
64  return;
65 
66  if (!fetch_next_block()) {
67  m_iter = m_index->end();
68  return;
69  }
70 
71  if (m_start_key) {
72  const uint8_t *ptr;
75  if (ptr >= m_block.end) {
76  if (!fetch_next_block(true)) {
77  m_iter = m_index->end();
78  return;
79  }
80  }
81  else
83  }
84  }
85 
90  m_iter = m_index->end();
91  return;
92  }
93 
98  if (m_key.flag != FLAG_DELETE_ROW &&
100  forward();
101 
102 }
103 
104 
105 template <typename IndexT>
107  if (m_block.base != 0) {
108  if (m_cached)
109  Global::block_cache->checkin(m_file_id, m_block.offset);
110  else
111  delete [] m_block.base;
112  }
113  delete m_zcodec;
114  delete m_key_decompressor;
115 }
116 
117 template <typename IndexT>
119 
120  if (m_iter == m_index->end())
121  return false;
122 
123  key = m_key;
124  value = m_cur_value;
125 
126  return true;
127 }
128 
129 
130 
131 template <typename IndexT>
133  const uint8_t *ptr;
134 
135  while (true) {
136 
137  if (m_iter == m_index->end())
138  return;
139 
140  ptr = m_cur_value.ptr + m_cur_value.length();
141 
142  if (ptr >= m_block.end) {
143  if (!fetch_next_block(true)) {
144  m_iter = m_index->end();
145  return;
146  }
147  if (m_check_for_range_end && !m_key_decompressor->less_than(m_end_key)) {
148  m_iter = m_index->end();
149  return;
150  }
151  }
152  else {
153  m_cur_value.ptr = m_key_decompressor->add(ptr);
154  if (m_check_for_range_end && !m_key_decompressor->less_than(m_end_key)) {
155  m_iter = m_index->end();
156  return;
157  }
158  }
159 
163  m_key_decompressor->load(m_key);
164  if (m_key.flag == FLAG_DELETE_ROW
165  || m_scan_ctx->family_mask[m_key.column_family_code])
166  // forward to next row requested by scan and filter rows
167  if (m_rowset.empty() || strcmp(m_key.row, *m_rowset.begin()) >= 0)
168  break;
169  }
170 }
171 
172 
173 
186 template <typename IndexT>
188 
189  // If we're at the end of the current block, deallocate and move to next
190  if (m_block.base != 0 && eob) {
191  if (m_cached)
192  Global::block_cache->checkin(m_file_id, m_block.offset);
193  else
194  delete [] m_block.base;
195  memset(&m_block, 0, sizeof(m_block));
196  ++m_iter;
197 
198  // find next block requested by scan and filter rows
199  if (m_rowset.size()) {
200  while (m_iter != m_index->end() && strcmp(*m_rowset.begin(), m_iter.key().row()) > 0)
201  ++m_iter;
202  }
203  }
204 
205  if (m_block.base == 0 && m_iter != m_index->end()) {
206  DynamicBuffer expand_buf;
207  uint32_t len;
208 
209  m_block.offset = m_iter.value();
210 
211  IndexIteratorT it_next = m_iter;
212  ++it_next;
213  if (it_next == m_index->end()) {
214  m_block.zlength = m_index->end_of_last_block() - m_block.offset;
215  if (m_end_row[0] != (char)0xff)
216  m_check_for_range_end = true;
217  }
218  else {
219  if (strcmp(it_next.key().row(), m_end_row) >= 0)
220  m_check_for_range_end = true;
221  m_block.zlength = it_next.value() - m_block.offset;
222  }
223 
227  if (Global::block_cache == 0 || Global::block_cache->compressed() ||
228  !Global::block_cache->checkout(m_file_id, m_block.offset,
229  (uint8_t **)&m_block.base, &len)) {
230  bool second_try {};
231  bool checked_out {};
232 
233  try_again:
234  try {
235  DynamicBuffer buf;
236  EventPtr event;
237 
238  if (Global::block_cache == 0 || !Global::block_cache->compressed() ||
239  !Global::block_cache->checkout(m_file_id, m_block.offset,
240  (uint8_t **)&buf.base, &len)) {
241 
243  DispatchHandlerSynchronizer sync_handler;
244  Global::dfs->pread(m_fd, m_block.zlength, m_block.offset, second_try, &sync_handler);
245  if (!sync_handler.wait_for_reply(event))
246  HT_THROW(Protocol::response_code(event.get()),
247  Protocol::string_format_message(event).c_str());
248  {
249  uint32_t length;
250  uint64_t off;
251  const void *data;
252  Global::dfs->decode_response_read(event, &data, &off, &length);
253  buf.base = (uint8_t *)data;
254  buf.own = false;
255  }
256 
257  checked_out = false;
258  }
259  else {
260  HT_ASSERT(len == m_block.zlength);
261  buf.size = m_block.zlength;
262  buf.own = false;
263  checked_out = true;
264  }
265 
266  buf.ptr = buf.base + m_block.zlength;
267 
269  BlockHeaderCellStore header(m_cellstore->block_header_format());
270 
271  m_zcodec->inflate(buf, expand_buf, header);
272 
273  if (!checked_out)
274  m_disk_read += expand_buf.fill();
275 
276  if (!header.check_magic(CellStore::DATA_BLOCK_MAGIC))
278  "Error inflating cell store block - magic string mismatch");
279 
281  if (Global::block_cache && Global::block_cache->compressed()) {
282  if (checked_out)
283  Global::block_cache->checkin(m_file_id, m_block.offset);
284  else
285  Global::block_cache->insert(m_file_id, m_block.offset,
286  (uint8_t *)buf.base, m_block.zlength,
287  event, false);
288  }
289  }
290  catch (Exception &e) {
291  HT_WARN_OUT << "Error reading cell store (fd=" << m_fd << " file="
292  << m_cellstore->get_filename() << ") : "
293  << e << HT_END;
294  HT_WARN_OUT << "pread(fd=" << m_fd << ", zlen="
295  << m_block.zlength << ", offset=" << m_block.offset
296  << HT_END;
297  if (second_try)
298  throw;
299 
300  HT_INFO("Retrying with dfs checksum enabled");
301  second_try = true;
302  goto try_again;
303  }
304 
306  size_t fill;
307  m_block.base = expand_buf.release(&fill);
308  len = fill;
309 
312  Global::block_cache->insert(m_file_id, m_block.offset,
313  (uint8_t *)m_block.base, len, EventPtr(), true);
314  }
315  else
316  m_cached = true;
317 
318  m_key_decompressor->reset();
319  m_block.end = m_block.base + len;
320  m_cur_value.ptr = m_key_decompressor->add(m_block.base);
321 
322  return true;
323  }
324  return false;
325 }
326 
327 namespace Hypertable {
330 }
Retrieves system information (hardware, installation directory, etc)
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
const char * row() const
Definition: SerializedKey.h:53
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
Declarations for CellStoreScannerIntervalBlockIndex.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
#define HT_INFO(msg)
Definition: Logger.h:271
Scan context information.
Definition: ScanContext.h:52
virtual void load(Key &key)=0
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Declarations for Event.
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
uint32_t size
The size of the allocated memory buffer (base)
void checkin(int file_id, uint64_t file_offset)
Compatibility Macros for C/C++.
virtual bool less_than(SerializedKey serialized_key)=0
#define HT_END
Definition: Logger.h:220
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
virtual const uint8_t * add(const uint8_t *ptr)=0
#define HT_WARN_OUT
Definition: Logger.h:291
bool own
If true then the buffer (base) will be released when going out of scope; if false then the caller has...
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
bool insert(int file_id, uint64_t file_offset, uint8_t *block, uint32_t length, const EventPtr &event, bool checkout)
std::shared_ptr< CellStore > CellStorePtr
Smart pointer to CellStore.
Definition: CellStore.h:340
Hypertable definitions
Provides the ability to scan over a portion of a cell store using its block index.
DispatchHandler class used to synchronize with response messages.
Declarations for BlockHeaderCellStore.
Declarations for Protocol.
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
static const char DATA_BLOCK_MAGIC[10]
Definition: CellStore.h:325
This is a generic exception class for Hypertable.
Definition: Error.h:314
uint8_t * release(size_t *lenp=0)
Moves ownership of the buffer to the caller.
uint8_t column_family_code
Definition: Key.h:127
uint8_t flag
Definition: Key.h:125
CellStoreScannerIntervalBlockIndex(CellStorePtr &cellstore, IndexT *index, SerializedKey start_key, SerializedKey end_key, ScanContext *scan_ctx)
bool fetch_next_block(bool eob=false)
This method fetches the 'next' compressed block of key/value pairs from the underlying CellStore...
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
static Hypertable::FileBlockCache * block_cache
Definition: Global.h:90
Declarations for DispatchHandlerSynchronizer.
Declarations for CellStoreBlockIndexArray.