0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CellStoreScannerIntervalReadahead.cc
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
27 
28 #include <Common/Compat.h>
30 
33 
35 
36 #include <Common/Error.h>
37 #include <Common/Filesystem.h>
38 #include <Common/System.h>
39 
40 #include <cassert>
41 
42 using namespace Hypertable;
43 
44 namespace {
45  const uint32_t MINIMUM_READAHEAD_AMOUNT = 524288;
46 }
47 
48 
49 template <typename IndexT>
51  IndexT *index, SerializedKey start_key, SerializedKey end_key, ScanContext *scan_ctx) :
52  m_cellstore(cellstore), m_end_key(end_key), m_scan_ctx(scan_ctx) {
53  int64_t start_offset;
54 
55  memset(&m_block, 0, sizeof(m_block));
56  m_zcodec = m_cellstore->create_block_compression_codec();
57  m_key_decompressor = m_cellstore->create_key_decompressor();
58 
59  uint16_t csversion = boost::any_cast<uint16_t>(cellstore->get_trailer()->get("version"));
60  if (csversion >= 4)
62 
63  if (index) {
64  IndexIteratorT iter, end_iter;
65 
66  iter = (start_key) ? index->lower_bound(start_key) : index->begin();
67  if (iter == index->end()) {
68  m_eos = true;
69  return;
70  }
71 
72  start_offset = iter.value();
73 
74  if (!end_key || (end_iter = index->upper_bound(end_key)) == index->end())
75  m_end_offset = index->end_of_last_block();
76  else {
77  ++end_iter;
78  if (end_iter == index->end())
79  m_end_offset = index->end_of_last_block();
80  else
81  m_end_offset = end_iter.value();
82  }
83  }
84  else {
85  start_offset = 0;
86  m_end_offset = cellstore->end_of_last_block();
87  }
88  m_offset = start_offset;
89 
90  uint32_t buf_size = cellstore->get_blocksize();
91 
92  if (buf_size < MINIMUM_READAHEAD_AMOUNT)
93  buf_size = MINIMUM_READAHEAD_AMOUNT;
94 
95  try {
96  m_fd = Global::dfs->open_buffered(cellstore->get_filename(), m_oflags,
97  buf_size, 5, start_offset, m_end_offset);
98  }
99  catch (Exception &e) {
100  m_eos = true;
101  HT_THROW2F(e.code(), e, "Problem opening cell store in "
102  "readahead mode: %s", e.what());
103  }
104 
106  m_eos = true;
107  return;
108  }
109 
114  if (start_key) {
115  const uint8_t *ptr;
116  while (m_key_decompressor->less_than(start_key)) {
117  ptr = m_cur_value.ptr + m_cur_value.length();
118  if (ptr >= m_block.end) {
119  if (!fetch_next_block_readahead(true)) {
120  m_eos = true;
121  return;
122  }
123  }
124  else
126  }
127  }
128 
133  m_eos = true;
134  return;
135  }
136 
141  if (m_key.flag != FLAG_DELETE_ROW &&
143  forward();
144 }
145 
146 
147 template <typename IndexT>
149  try {
150  if (m_fd != -1)
151  Global::dfs->close(m_fd);
152  delete [] m_block.base;
153  delete m_zcodec;
154  delete m_key_decompressor;
155  }
156  catch (Exception &e) {
157  HT_ERROR_OUT << e << HT_END;
158  }
159  catch (...) {
160  HT_ERRORF("Unknown exception caught in %s", HT_FUNC);
161  }
162 }
163 
164 
165 
166 template <typename IndexT>
168 
169  if (m_eos)
170  return false;
171 
172  key = m_key;
173  value = m_cur_value;
174 
175  return true;
176 }
177 
178 
179 
180 template <typename IndexT>
182  const uint8_t *ptr;
183 
184  while (true) {
185 
186  if (m_eos)
187  return;
188 
189  ptr = m_cur_value.ptr + m_cur_value.length();
190 
191  if (ptr >= m_block.end) {
192  if (!fetch_next_block_readahead(true)) {
193  m_eos = true;
194  return;
195  }
196  if (m_check_for_range_end && !m_key_decompressor->less_than(m_end_key)) {
197  m_eos = true;
198  return;
199  }
200  }
201  else {
202  m_cur_value.ptr = m_key_decompressor->add(ptr);
203  if (m_check_for_range_end && !m_key_decompressor->less_than(m_end_key)) {
204  m_eos = true;
205  return;
206  }
207  }
208 
212  m_key_decompressor->load(m_key);
213  if (m_key.flag == FLAG_DELETE_ROW
214  || m_scan_ctx->family_mask[m_key.column_family_code])
215  break;
216  }
217 }
218 
219 
220 
221 
236 template <typename IndexT>
238 
239  // If we're at the end of the current block, deallocate and move to next
240  if (m_block.base != 0 && eob) {
241  delete [] m_block.base;
242  memset(&m_block, 0, sizeof(m_block));
243  }
244 
245  if (m_offset >= m_end_offset)
246  m_eos = true;
247 
248  if (m_block.base == 0 && !m_eos) {
249  DynamicBuffer expand_buf(0);
250  uint32_t len;
251  uint32_t nread;
252 
253  m_block.offset = m_offset;
254 
256  try {
257  BlockHeaderCellStore header(m_cellstore->block_header_format());
258  DynamicBuffer input_buf( header.encoded_length() );
259 
260  nread = Global::dfs->read(m_fd, input_buf.base, header.encoded_length() );
261  HT_EXPECT(nread == header.encoded_length(), Error::RANGESERVER_SHORT_CELLSTORE_READ);
262 
263  size_t remaining = nread;
264 
265  header.decode((const uint8_t **)&input_buf.ptr, &remaining);
266 
267  size_t extra = 0;
268  if (m_oflags & Filesystem::OPEN_FLAG_DIRECTIO) {
269  if ((header.encoded_length()+header.get_data_zlength())%HT_DIRECT_IO_ALIGNMENT)
270  extra = HT_DIRECT_IO_ALIGNMENT - ((header.encoded_length()+header.get_data_zlength())%HT_DIRECT_IO_ALIGNMENT);
271  }
272 
273  input_buf.grow( input_buf.fill() + header.get_data_zlength() + extra );
274  nread = Global::dfs->read(m_fd, input_buf.ptr, header.get_data_zlength()+extra);
275  HT_EXPECT(nread == header.get_data_zlength()+extra, Error::RANGESERVER_SHORT_CELLSTORE_READ);
276  input_buf.ptr += header.get_data_zlength() + extra;
277 
278  if (m_offset + (int64_t)input_buf.fill() >= m_end_offset && m_end_key)
279  m_check_for_range_end = true;
280  m_offset += input_buf.fill();
281 
282  m_zcodec->inflate(input_buf, expand_buf, header);
283 
284  m_disk_read += expand_buf.fill();
285 
286  if (!header.check_magic(CellStore::DATA_BLOCK_MAGIC))
288  "Error inflating cell store block - magic string mismatch");
289  }
290  catch (Exception &e) {
291  HT_ERROR_OUT <<"Error reading cell store ( fd=" << m_fd << " file="
292  << m_cellstore->get_filename() <<") block: "
293  << e << HT_END;
294  HT_THROW2(e.code(), e, e.what());
295  }
296 
298  size_t fill;
299  m_block.base = expand_buf.release(&fill);
300  len = fill;
301 
302  m_key_decompressor->reset();
303  m_block.end = m_block.base + len;
304  m_cur_value.ptr = m_key_decompressor->add(m_block.base);
305 
306  return true;
307  }
308  return false;
309 }
310 
311 namespace Hypertable {
314 }
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
Retrieves system information (hardware, installation directory, etc)
#define HT_FUNC
Definition: compat-c.h:65
CellStoreScannerIntervalReadahead(CellStorePtr &cellstore, IndexT *index, SerializedKey start_key, SerializedKey end_key, ScanContext *scan_ctx)
Abstract base class for a filesystem.
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
Provides ability to efficiently scan over a portion of a cell store.
Scan context information.
Definition: ScanContext.h:52
Declarations for CellStoreScannerIntervalReadahead.
virtual void load(Key &key)=0
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
bool fetch_next_block_readahead(bool eob=false)
This method fetches the 'next' compressed block of key/value pairs from the underlying CellStore...
Compatibility Macros for C/C++.
virtual bool less_than(SerializedKey serialized_key)=0
#define HT_END
Definition: Logger.h:220
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
#define HT_ERROR_OUT
Definition: Logger.h:301
virtual const uint8_t * add(const uint8_t *ptr)=0
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
std::shared_ptr< CellStore > CellStorePtr
Smart pointer to CellStore.
Definition: CellStore.h:340
Hypertable definitions
Declarations for BlockHeaderCellStore.
Provides access to internal components of opaque key.
Definition: Key.h:40
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
static const char DATA_BLOCK_MAGIC[10]
Definition: CellStore.h:325
This is a generic exception class for Hypertable.
Definition: Error.h:314
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
uint8_t * release(size_t *lenp=0)
Moves ownership of the buffer to the caller.
uint8_t column_family_code
Definition: Key.h:127
uint8_t flag
Definition: Key.h:125
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
#define HT_DIRECT_IO_ALIGNMENT
Definition: Filesystem.h:49
Declarations for CellStoreBlockIndexArray.
int code() const
Returns the error code.
Definition: Error.h:391
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484