0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CellStoreV5.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include "Common/Compat.h"
23 #include <cassert>
24 
25 #include <boost/algorithm/string.hpp>
26 #include <boost/scoped_array.hpp>
27 
28 #include "Common/Config.h"
29 #include "Common/Error.h"
30 #include "Common/Logger.h"
31 #include "Common/System.h"
34 #include "Common/Time.h"
35 
36 #include "AsyncComm/Protocol.h"
37 
40 #include "Hypertable/Lib/Key.h"
41 #include "Hypertable/Lib/Schema.h"
42 
43 #include "CellStoreV5.h"
44 #include "CellStoreInfo.h"
45 #include "CellStoreTrailerV5.h"
46 #include "CellStoreScanner.h"
47 
48 #include "FileBlockCache.h"
49 #include "Global.h"
50 #include "Config.h"
51 #include "KeyCompressorPrefix.h"
52 #include "KeyDecompressorPrefix.h"
53 
54 using namespace std;
55 using namespace Hypertable;
56 
57 namespace {
58  const uint32_t MAX_APPENDS_OUTSTANDING = 3;
59  const uint16_t BLOCK_HEADER_FORMAT = 0;
60 }
61 
62 
63 CellStoreV5::CellStoreV5(Filesystem *filesys)
64  : m_filesys(filesys) {
66  assert(sizeof(float) == 4);
67 }
68 
70  : m_filesys(filesys), m_schema(schema) {
72  assert(sizeof(float) == 4);
73 }
74 
76  try {
77  delete m_compressor;
78  delete m_bloom_filter;
79  delete m_bloom_filter_items;
80  if (m_fd != -1)
82  delete [] m_column_ttl;
83  }
84  catch (Exception &e) {
85  HT_ERROR_OUT << e << HT_END;
86  }
87 
89 
90 }
91 
92 
96 }
97 
99  return new KeyDecompressorPrefix();
100 }
101 
105  if (m_trailer.index_entries == 0) {
106  HT_WARNF("%s has 0 index entries", m_filename.c_str());
107  return;
108  }
109  int32_t keys_per_block = m_trailer.total_entries / m_trailer.index_entries;
110  if (m_64bit_index)
111  m_index_map64.unique_row_count_estimate(split_row_data, keys_per_block);
112  else
113  m_index_map32.unique_row_count_estimate(split_row_data, keys_per_block);
114 }
115 
117  bool need_index = m_restricted_range || scan_ctx->restricted_range || scan_ctx->single_row;
118 
119  if (need_index) {
123  }
124 
125  if (m_64bit_index)
126  return make_shared<CellStoreScanner<CellStoreBlockIndexArray<int64_t>>>(shared_from_this(), scan_ctx, need_index ? &m_index_map64 : 0);
127  return make_shared<CellStoreScanner<CellStoreBlockIndexArray<uint32_t>>>(shared_from_this(), scan_ctx, need_index ? &m_index_map32 : 0);
128 }
129 
130 namespace {
131  int get_replication(PropertiesPtr &props, const TableIdentifier *table_id) {
132 
133  int32_t replication = props->get_i32("replication", int32_t(-1));
134 
135  if (replication == -1 && table_id) {
136  if (table_id->is_user()) {
137  if (Config::has("Hypertable.RangeServer.Data.DefaultReplication"))
138  replication = Config::get_i32("Hypertable.RangeServer.Data.DefaultReplication");
139  }
140  else if (Config::has("Hypertable.Metadata.Replication"))
141  replication = Config::get_i32("Hypertable.Metadata.Replication");
142  }
143 
144  return replication;
145  }
146 }
147 
148 void
149 CellStoreV5::create(const char *fname, size_t max_entries,
150  PropertiesPtr &props, const TableIdentifier *table_id) {
151  int64_t blocksize = props->get("blocksize", 0);
152  String compressor = props->get("compressor", String());
153 
154  m_key_compressor = make_shared<KeyCompressorPrefix>();
155 
156  assert(Config::properties); // requires Config::init* first
157  int32_t replication = get_replication(props, table_id);
158 
159  if (blocksize == 0)
160  blocksize = Config::get_i32("Hypertable.RangeServer.CellStore"
161  ".DefaultBlockSize");
162  if (compressor.empty())
163  compressor = Config::get_str("Hypertable.RangeServer.CellStore"
164  ".DefaultCompressor");
165  if (!props->has("bloom-filter-mode")) {
166  // probably not called from AccessGroup
167  AccessGroupOptions::parse_bloom_filter(Config::get_str("Hypertable.RangeServer"
168  ".CellStore.DefaultBloomFilter"), props);
169  }
170 
171  m_buffer.reserve(blocksize*4);
172 
173  m_max_entries = max_entries;
174 
175  m_fd = -1;
176  m_offset = 0;
177 
179  m_index_builder.variable_buf().reserve(1024*1024);
180 
181  m_uncompressed_data = 0.0;
182  m_compressed_data = 0.0;
183 
184  m_trailer.clear();
185  m_trailer.blocksize = blocksize;
186  m_uncompressed_blocksize = blocksize;
187 
188  // set up the "column_ttl" vector
190  ColumnFamilySpecs &column_family_specs = m_schema->get_column_families();
191  for (size_t i=0; i<column_family_specs.size(); i++) {
192  if (column_family_specs[i]->get_option_ttl()) {
193  if (m_column_ttl == 0) {
194  m_column_ttl = new int64_t[256];
195  memset(m_column_ttl, 0, 256*8);
196  }
197  m_column_ttl[ column_family_specs[i]->get_id() ] = column_family_specs[i]->get_option_ttl() * 1000000000LL;
198  }
199  }
200 
201  m_filename = fname;
202 
203  m_start_row = "";
205 
207  compressor, m_compressor_args);
208 
212 
214  m_fd = m_filesys->create(m_filename, oflags, -1, replication, -1);
215 
216  m_bloom_filter_mode = props->get<BloomFilterMode>("bloom-filter-mode");
217  m_max_approx_items = props->get_i32("max-approx-items");
218 
220  bool has_num_hashes = props->has("num-hashes");
221  bool has_bits_per_item = props->has("bits-per-item");
222 
223  if (has_num_hashes || has_bits_per_item) {
224  if (!(has_num_hashes && has_bits_per_item)) {
225  HT_WARN("Bloom filter option --bits-per-item must be used with "
226  "--num-hashes, defaulting to false probability of 0.01");
228  }
229  else {
230  m_trailer.bloom_filter_hash_count = props->get_i32("num-hashes");
231  m_bloom_bits_per_item = props->get_f64("bits-per-item");
232  }
233  }
234  else
235  m_filter_false_positive_prob = props->get_f64("false-positive");
236  m_bloom_filter_items = new BloomFilterItems(); // aproximator items
237  }
238  HT_DEBUG_OUT <<"bloom-filter-mode="<< m_bloom_filter_mode
239  <<" max-approx-items="<< m_max_approx_items <<" false-positive="
241 }
242 
243 
244 void CellStoreV5::create_bloom_filter(bool is_approx) {
246 
247  HT_DEBUG_OUT << "Creating new BloomFilter for CellStore '"
248  << m_filename <<"' for "<< (is_approx ? "estimated " : "")
249  << m_trailer.filter_items_estimate << " items"<< HT_END;
250  try {
251  if (m_filter_false_positive_prob != 0.0)
254  else
258  }
259  catch(Exception &e) {
260  HT_FATAL_OUT << "Error creating new BloomFilter for CellStore '"
261  << m_filename <<"' for "<< (is_approx ? "estimated " : "")
262  << m_trailer.filter_items_estimate << " items - "<< e << HT_END;
263  }
264 
265  for (const auto &blob : *m_bloom_filter_items)
266  m_bloom_filter->insert(blob.start, blob.size);
267 
268  delete m_bloom_filter_items;
269  m_bloom_filter_items = 0;
270 
271  HT_DEBUG_OUT << "Created new BloomFilter for CellStore '"
272  << m_filename <<"'"<< HT_END;
273 }
274 
275 const std::vector<String> &CellStoreV5::get_replaced_files() {
278  return m_replaced_files;
279 }
280 
282  bool second_try = false;
283  int64_t amount = m_trailer.replaced_files_length;
284  int64_t len = 0;
285 
286  try_again:
287 
288  try {
289  DynamicBuffer buf(amount);
290 
292  len = m_filesys->pread(m_fd, buf.ptr, amount, m_trailer.replaced_files_offset, second_try);
293 
294  if (len != amount)
295  HT_THROWF(Error::FSBROKER_IO_ERROR, "Error loading replaced files for "
296  "CellStore '%s' : tried to read %lld but only got %lld",
297  m_filename.c_str(), (Lld)amount, (Lld)len);
300  StringDecompressorPrefix decompressor;
302  const uint8_t *ptr = buf.base;
303  for (uint32_t ii=0; ii < m_trailer.replaced_files_entries; ++ii) {
304  if (ptr - buf.base >= (ptrdiff_t) m_trailer.replaced_files_length)
306  "Bad replaced_files_offset in CellStore trailer fd=%u replaced_files_offset=%lld, "
307  "length=%llu, entries=%u, file='%s'", (unsigned)m_fd,
309  (unsigned)m_trailer.replaced_files_entries, m_filename.c_str());
310  ptr = decompressor.add(ptr);
311  decompressor.load(filename);
312  m_replaced_files.push_back(filename);
313  }
314  }
315  catch (Exception &e) {
316  String msg;
317  HT_ERROR_OUT << "pread(fd=" << m_fd << ", len=" << len << ", amount="
318  << amount << ")\n" << HT_END;
320  if (second_try)
321  HT_THROW2(e.code(), e, msg);
322  second_try = true;
323  goto try_again;
324  }
326 }
327 
329  size_t len;
330 
332 
333  HT_DEBUG_OUT << "Loading BloomFilter for CellStore '"
335  << " items"<< HT_END;
336  try {
341  }
342  catch(Exception &e) {
343  HT_FATAL_OUT << "Error loading BloomFilter for CellStore '"
345  << " items -"<< e << HT_END;
346  }
347 
348  if (m_bloom_filter->total_size() > 0) {
349 
350  bool second_try = false;
351 
352  while (true) {
353  try {
355  m_trailer.filter_offset, second_try);
356  }
357  catch (Exception &e) {
358  if (!second_try) {
359  second_try=true;
360  continue;
361  }
362  HT_THROW2(e.code(), e, format("Error loading BloomFilter for CellStore '%s'",
363  m_filename.c_str()));
364  }
365  break;
366  }
367 
368  if (len != m_bloom_filter->total_size())
369  HT_THROWF(Error::FSBROKER_IO_ERROR, "Problem loading bloomfilter for"
370  "CellStore '%s' : tried to read %lld but only got %lld",
371  m_filename.c_str(), (Lld)m_bloom_filter->total_size(), (Lld)len);
372 
373  m_bytes_read += len;
374 
376  }
377 
380 
381 }
382 
383 
384 
386  uint64_t memory_purged = 0;
387 
389  memory_purged = m_index_stats.bloom_filter_memory;
390  delete m_bloom_filter;
391  m_bloom_filter = 0;
393  }
394 
396  memory_purged += m_index_stats.block_index_memory;
397  if (m_64bit_index)
399  else
402  }
403 
404  Global::memory_tracker->subtract( memory_purged );
405 
406  return memory_purged;
407 }
408 
409 
410 
411 void CellStoreV5::add(const Key &key, const ByteString value) {
412  EventPtr event_ptr;
413  DynamicBuffer zbuf;
414 
415  if (key.revision > m_trailer.revision)
417 
418  if (key.timestamp != TIMESTAMP_NULL) {
421  else if (key.timestamp > m_trailer.timestamp_max)
423  }
424 
425  if (m_buffer.fill() > (size_t)m_uncompressed_blocksize) {
426  BlockHeaderCellStore header(BLOCK_HEADER_FORMAT, DATA_BLOCK_MAGIC);
427 
429 
430  m_uncompressed_data += (float)m_buffer.fill();
432  m_compressed_data += (float)zbuf.fill();
433  m_buffer.clear();
434 
435  uint64_t llval = ((uint64_t)m_trailer.blocksize
436  * (uint64_t)m_uncompressed_data) / (uint64_t)m_compressed_data;
437  m_uncompressed_blocksize = (int64_t)llval;
438 
439  if (m_outstanding_appends >= MAX_APPENDS_OUTSTANDING) {
440  if (!m_sync_handler.wait_for_reply(event_ptr)) {
441  if (event_ptr->type == Event::MESSAGE)
443  "Problem writing to FS file '%s' : %s", m_filename.c_str(),
445  HT_THROWF(event_ptr->error,
446  "Problem writing to FS file '%s'", m_filename.c_str());
447  }
449  }
450 
451  if (!HT_IO_ALIGNED(zbuf.fill())) {
452  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
453  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
454  }
455 
456  size_t zlen = zbuf.fill();
457  StaticBuffer send_buf(zbuf);
458 
460  catch (Exception &e) {
461  HT_THROW2F(e.code(), e, "Problem writing to FS file '%s'",
462  m_filename.c_str());
463  }
465  m_offset += zlen;
466  m_key_compressor->reset();
467  }
468 
469  m_key_compressor->add(key);
470 
471  size_t key_len = m_key_compressor->length();
472  size_t value_len = value.length();
473 
474  m_trailer.key_bytes += key.length;
475  m_trailer.value_bytes += value_len;
476 
477  if (m_column_ttl && m_column_ttl[key.column_family_code] != 0) {
478  m_trailer.expirable_data += key_len + value_len;
481  }
482 
483  if (key.flag <= FLAG_DELETE_CELL_VERSION)
485 
486  m_buffer.ensure(key_len + value_len);
487 
488  m_key_compressor->write(m_buffer.ptr);
489  m_buffer.ptr += key_len;
490 
491  m_buffer.add_unchecked(value.ptr, value_len);
492 
495  m_bloom_filter_items->insert(key.row, key.row_len);
496 
498  m_bloom_filter_items->insert(key.row, key.row_len + 2);
499 
501  m_trailer.filter_items_estimate = (size_t)(((double)m_max_entries
502  / (double)m_max_approx_items) * m_bloom_filter_items->size());
503  if (m_trailer.filter_items_estimate == 0) {
504  HT_INFOF("max_entries = %lld, max_approx_items = %lld, bloom_filter_items_size = %lld",
505  (Lld)m_max_entries, (Lld)m_max_approx_items, (Lld)m_bloom_filter_items->size());
507  }
508  create_bloom_filter(true);
509  }
510  }
511  else {
512  assert(!m_bloom_filter_items && m_bloom_filter);
513 
514  m_bloom_filter->insert(key.row);
515 
517  m_bloom_filter->insert(key.row, key.row_len + 2);
518  }
519  }
520 
522 }
523 
524 
525 void CellStoreV5::finalize(TableIdentifier *table_identifier) {
526  EventPtr event_ptr;
527  size_t zlen;
528  DynamicBuffer zbuf(0);
529  SerializedKey key;
530  StaticBuffer send_buf;
531  int64_t index_memory = 0;
532 
533  if (m_buffer.fill() > 0) {
534  BlockHeaderCellStore header(BLOCK_HEADER_FORMAT, DATA_BLOCK_MAGIC);
535 
537 
538  m_uncompressed_data += (float)m_buffer.fill();
540  m_compressed_data += (float)zbuf.fill();
541 
542  if (!HT_IO_ALIGNED(zbuf.fill())) {
543  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
544  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
545  }
546  zlen = zbuf.fill();
547  send_buf = zbuf;
548 
549  if (m_outstanding_appends >= MAX_APPENDS_OUTSTANDING) {
550  if (!m_sync_handler.wait_for_reply(event_ptr))
552  "Problem finalizing CellStore file '%s' : %s",
553  m_filename.c_str(),
554  Protocol::string_format_message(event_ptr).c_str());
556  }
557 
559 
561  m_offset += zlen;
562  }
563 
564  m_key_compressor = 0;
565 
566  m_buffer.free();
567 
569  if (m_uncompressed_data == 0)
571  else
573 
575 
580 
584  {
585  BlockHeaderCellStore header(BLOCK_HEADER_FORMAT, INDEX_FIXED_BLOCK_MAGIC);
587  }
588 
589  if (!HT_IO_ALIGNED(zbuf.fill())) {
590  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
591  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
592  }
593  zlen = zbuf.fill();
594  send_buf = zbuf;
595 
597 
599  m_offset += zlen;
600 
604  {
605  BlockHeaderCellStore header(BLOCK_HEADER_FORMAT, INDEX_VARIABLE_BLOCK_MAGIC);
608  }
609 
610  delete m_compressor;
611  m_compressor = 0;
612 
613  if (!HT_IO_ALIGNED(zbuf.fill())) {
614  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
615  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
616  }
617  zlen = zbuf.fill();
618  send_buf = zbuf;
619 
621 
623  m_offset += zlen;
624 
625  // write filter_offset
627 
628  // if bloom_items haven't been spilled to create a bloom filter yet, do it
631 
632  if (m_bloom_filter_items && m_bloom_filter_items->size() > 0) {
633  m_trailer.filter_items_estimate = m_bloom_filter_items->size();
635  }
636 
637  if (m_bloom_filter) {
642  m_bloom_filter->serialize(send_buf);
646  }
647  }
648 
649  // Write compressed replaced_file lists
650  // Coalesce with trailer block if possible
651  zbuf.clear();
652  size_t compressed_len = 0;
653  StringCompressorPrefix compressor;
654  bool coalesce_with_trailer =false;
655  for (size_t ii=0; ii < m_replaced_files.size();++ii) {
656  compressor.add(m_replaced_files[ii].c_str());
657  compressed_len += compressor.length();
658  }
659 
660  if (HT_IO_ALIGNMENT_PADDING(compressed_len) >= m_trailer.size()) {
661  coalesce_with_trailer = true;
662  zbuf.reserve(compressed_len + m_trailer.size() +
663  HT_IO_ALIGNMENT_PADDING(compressed_len+m_trailer.size()));
664  }
665  else
666  zbuf.reserve(compressed_len + HT_IO_ALIGNMENT_PADDING(compressed_len));
669  m_trailer.replaced_files_length = compressed_len;
670 
671  compressor.reset();
672  for (size_t ii=0; ii < m_replaced_files.size();++ii) {
673  compressor.add(m_replaced_files[ii].c_str());
674  compressor.write(zbuf.ptr);
675  zbuf.ptr += compressor.length();
676  }
677 
678  if (!coalesce_with_trailer) {
679  if (!HT_IO_ALIGNED(zbuf.fill())) {
680  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
681  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
682  }
683  send_buf = zbuf;
686  zlen = zbuf.fill();
687  m_offset += zlen;
688  }
689 
691 
693  if (m_64bit_index) {
698  index_memory = m_index_map64.memory_used();
700  }
701  else {
706  index_memory = m_index_map32.memory_used();
707  }
708 
709  // deallocate fix index data
711 
712  // Add table information
713  m_trailer.table_id = table_identifier->index();
714  m_trailer.table_generation = table_identifier->generation;
716 
717  // write trailer
718  if (!coalesce_with_trailer) {
719  zbuf.clear();
722  memset(zbuf.base, 0, HT_DIRECT_IO_ALIGNMENT);
723  zbuf.ptr = zbuf.base + (HT_DIRECT_IO_ALIGNMENT-m_trailer.size());
724  }
725  else {
727  memset(zbuf.ptr, 0, padding);
728  zbuf.ptr += padding;
729  }
730  m_trailer.serialize(zbuf.ptr);
731  zbuf.ptr += m_trailer.size();
732 
733  zlen = zbuf.fill();
734  send_buf = zbuf;
735 
736  m_filesys->append(m_fd, send_buf);
737 
739  m_offset += zlen;
740 
742  m_filesys->close(m_fd);
743 
746 
749 
750  // If compacting due to a split, estimate the disk usage at 1/2
753  else
755 
756  m_index_stats.block_index_memory = index_memory;
757 
758  if (m_bloom_filter)
760 
761  delete [] m_column_ttl;
762  m_column_ttl = 0;
763 
765 }
766 
767 
769  int64_t offset) {
770 
771  // switch to 64-bit offsets if offset being added is >= 2^32
772  if (!m_bigint && offset >= 4294967296LL) {
773  DynamicBuffer tmp_buf(m_fixed.size*2);
774  const uint8_t *src = m_fixed.base;
775  uint8_t *dst = tmp_buf.base;
776  size_t remaining = m_fixed.fill();
777  while (src < m_fixed.ptr)
778  Serialization::encode_i64(&dst, (uint64_t)Serialization::decode_i32(&src, &remaining));
779  delete [] m_fixed.release();
780  m_fixed.base = tmp_buf.base;
781  m_fixed.ptr = dst;
782  m_fixed.size = tmp_buf.size;
783  m_fixed.own = true;
784  tmp_buf.release();
785  m_bigint = true;
786  }
787 
788  // Add key to variable buffer
789  size_t key_len = key_compressor->length_uncompressed();
790  m_variable.ensure(key_len);
791  key_compressor->write_uncompressed(m_variable.ptr);
792  m_variable.ptr += key_len;
793 
794  // Serialize offset into fix index buffer
795  if (m_bigint) {
796  m_fixed.ensure(8);
797  memcpy(m_fixed.ptr, &offset, 8);
798  m_fixed.ptr += 8;
799  }
800  else {
801  m_fixed.ensure(4);
802  memcpy(m_fixed.ptr, &offset, 4);
803  m_fixed.ptr += 4;
804  }
805 }
806 
807 
809  uint8_t *base;
810  size_t len;
811 
812  base = m_fixed.release(&len);
813  m_fixed.reserve(len);
814  m_fixed.add_unchecked(base, len);
815  delete [] base;
816 
817  base = m_variable.release(&len);
818  m_variable.reserve(len);
819  m_variable.add_unchecked(base, len);
820  delete [] base;
821 }
822 
823 
824 
825 void
826 CellStoreV5::open(const String &fname, const String &start_row,
827  const String &end_row, int32_t fd, int64_t file_length,
828  CellStoreTrailer *trailer) {
829  m_filename = fname;
830  m_start_row = start_row;
831  m_end_row = end_row;
832  m_fd = fd;
833  m_file_length = file_length;
834 
836 
837  m_trailer = *static_cast<CellStoreTrailerV5 *>(trailer);
838 
839  // If compacting due to a split, estimate the disk usage at 1/2
842  else
844 
846 
849 
851  m_64bit_index = true;
852 
856  "Bad index offsets in CellStore trailer fd=%u fix=%lld, var=%lld, "
857  "length=%llu, file='%s'", (unsigned)m_fd, (Lld)m_trailer.fix_index_offset,
858  (Lld)m_trailer.var_index_offset, (Llu)m_file_length, fname.c_str());
859 
860  Global::memory_tracker->add( sizeof(CellStoreV5) + sizeof(CellStoreInfo) );
861 
862 }
863 
864 
866  int64_t amount, index_amount;
867  int64_t len = 0;
868  BlockHeaderCellStore header(BLOCK_HEADER_FORMAT);
869  SerializedKey key;
870  bool inflating_fixed=true;
871  bool second_try = false;
872 
874 
875  unique_ptr<BlockCompressionCodec> compressor(create_block_compression_codec());
876 
877  amount = index_amount = m_trailer.filter_offset - m_trailer.fix_index_offset;
878 
879  try_again:
880 
881  try {
882  DynamicBuffer buf(amount);
883 
885  len = m_filesys->pread(m_fd, buf.ptr, amount, m_trailer.fix_index_offset, second_try);
886 
887  if (len != amount)
888  HT_THROWF(Error::FSBROKER_IO_ERROR, "Error loading index for "
889  "CellStore '%s' : tried to read %lld but only got %lld",
890  m_filename.c_str(), (Lld)amount, (Lld)len);
893  compressor->inflate(buf, m_index_builder.fixed_buf(), header);
894 
896 
897  inflating_fixed = false;
898 
901 
903  DynamicBuffer vbuf(0, false);
905  vbuf.base = buf.ptr;
906  vbuf.ptr = buf.ptr + amount;
907 
908  compressor->inflate(vbuf, m_index_builder.variable_buf(), header);
909 
911 
914  }
915  catch (Exception &e) {
916  String msg;
917  if (inflating_fixed) {
918  msg = String("Error inflating FIXED index for cellstore '")
919  + m_filename + "'";
920  HT_ERROR_OUT << msg << ": "<< e << HT_END;
921  }
922  else {
923  msg = "Error inflating VARIABLE index for cellstore '" + m_filename + "'";
924  HT_ERROR_OUT << msg << ": " << e << HT_END;
925  }
926  HT_ERROR_OUT << "pread(fd=" << m_fd << ", len=" << len << ", amount="
927  << index_amount << ")\n" << HT_END;
929  if (second_try)
930  HT_THROW2(e.code(), e, msg);
931  second_try = true;
932  goto try_again;
933  }
934 
936  if (m_64bit_index) {
941  }
942  else {
947  }
948 
950 
952 }
953 
954 
956 
958  return true;
959  else if (m_trailer.filter_length == 0) // bloom filter is empty
960  return false;
961  else if (m_bloom_filter == 0)
963 
965 
966  switch (m_bloom_filter_mode) {
967  case BLOOM_FILTER_ROWS:
968  return may_contain(scan_ctx->start_row);
970  if (may_contain(scan_ctx->start_row)) {
971  SchemaPtr &schema = scan_ctx->schema;
972  size_t rowlen = scan_ctx->start_row.length();
973  boost::scoped_array<char> rowcol(new char[rowlen + 2]);
974  memcpy(rowcol.get(), scan_ctx->start_row.c_str(), rowlen + 1);
975 
976  for (auto col : scan_ctx->spec->columns) {
977  uint8_t column_family_id = schema->get_column_family(col)->get_id();
978  rowcol[rowlen + 1] = column_family_id;
979 
980  if (may_contain(rowcol.get(), rowlen + 2))
981  return true;
982  }
983  }
984  return false;
985  default:
986  HT_ASSERT(!"unpossible bloom filter mode!");
987  }
988  return false; // silence stupid compilers
989 }
990 
991 
992 bool CellStoreV5::may_contain(const void *ptr, size_t len) {
993 
995  return true;
996  else if (m_trailer.filter_length == 0) // bloom filter is empty
997  return false;
998  else if (m_bloom_filter == 0)
1000 
1002  bool may_contain = m_bloom_filter->may_contain(ptr, len);
1003  return may_contain;
1004 }
1005 
1006 
1007 
1010  load_block_index();
1011  if (m_64bit_index)
1013  else
1015 }
1016 
1017 
1019  return BLOCK_HEADER_FORMAT;
1020 }
size_t get_num_hashes()
Getter for the number of hash functions.
void free()
Frees resources.
size_t get_items_actual()
Getter for the actual number of items.
static const char INDEX_FIXED_BLOCK_MAGIC[10]
Definition: CellStore.h:326
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
A memory buffer of static size.
Definition: StaticBuffer.h:45
Retrieves system information (hardware, installation directory, etc)
void create_bloom_filter(bool is_approx=false)
Definition: CellStoreV5.cc:244
virtual void append(int fd, StaticBuffer &buffer, Flags flags, DispatchHandler *handler)=0
Appends data to a file asynchronously.
Abstract base class for cell store trailer.
virtual void close(int fd, DispatchHandler *handler)=0
Closes a file asynchronously.
int64_t timestamp
Definition: Key.h:134
#define HT_IO_ALIGNMENT_PADDING(size)
Definition: Filesystem.h:54
const char * row
Definition: Key.h:129
#define HT_WARNF(msg,...)
Definition: Logger.h:290
void add(const Key &key, const ByteString value) override
Inserts a key/value pair into the cell list.
Definition: CellStoreV5.cc:411
virtual void pread(int fd, size_t amount, uint64_t offset, bool verify_checksum, DispatchHandler *handler)=0
Reads data from a file at the specified position asynchronously.
BlockCompressionCodec * create_block_compression_codec() override
Creates a block compression codec suitable for decompressing the cell store's blocks.
Definition: CellStoreV5.cc:93
static String filename
Definition: Config.cc:48
void add_entry(KeyCompressorPtr &key_compressor, int64_t offset)
Definition: CellStoreV5.cc:768
static const char INDEX_VARIABLE_BLOCK_MAGIC[10]
Definition: CellStore.h:327
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
#define HT_IO_ALIGNED(size)
Definition: Filesystem.h:51
uint32_t length
Definition: Key.h:124
bool check_magic(const char *magic)
Compares a given character sequence with the magic field.
Definition: BlockHeader.h:86
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
CellStoreV5(Filesystem *filesys)
Definition: CellStoreV5.cc:63
std::shared_ptr< KeyCompressor > KeyCompressorPtr
Definition: KeyCompressor.h:45
DynamicBuffer m_buffer
Definition: CellStoreV5.h:143
A class to decompress prefix-compressed strings.
std::string m_start_row
Definition: CellList.h:99
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
bool may_contain(const void *ptr, size_t len)
Definition: CellStoreV5.cc:992
Declarations for CellStoreScanner.
static void parse_bloom_filter(const std::string &spec, PropertiesPtr &props)
Parsers a bloom filter specification and sets properties.
void create(const char *fname, size_t max_entries, PropertiesPtr &props, const TableIdentifier *table_id=0) override
Creates a new cell store.
Definition: CellStoreV5.cc:149
void finalize(TableIdentifier *table_identifier) override
Finalizes the creation of a cell store, by writing block index and metadata trailer.
Definition: CellStoreV5.cc:525
DispatchHandlerSynchronizer m_sync_handler
Definition: CellStoreV5.h:145
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
BloomFilterItems * m_bloom_filter_items
Definition: CellStoreV5.h:158
Scan context information.
Definition: ScanContext.h:52
std::string m_end_row
Definition: CellList.h:100
Type
Enumeration for compression type.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Filesystem * m_filesys
Definition: CellStoreV5.h:134
virtual void reset()
Clears the internal state.
Declarations for Schema.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
CellStoreTrailerV5 m_trailer
Definition: CellStoreV5.h:141
uint32_t row_len
Definition: Key.h:131
void serialize(StaticBuffer &buf)
Serializes the BloomFilter into a static memory buffer.
IndexMemoryStats m_index_stats
Definition: CellStore.h:332
void display_block_info() override
Displays block information to stdout.
static BlockCompressionCodec::Type parse_block_codec_spec(const std::string &spec, BlockCompressionCodec::Args &args)
Given a block codec config string return its type and put config.
size_t total_size()
Getter for the total size (including checksum and metadata)
virtual void serialize(uint8_t *buf)
Serializes this trailer to the given buffer;.
static Hypertable::MemoryTracker * memory_tracker
Definition: Global.h:94
virtual size_t size()
Returns the serialized size of the trailer.
std::map< const char *, int64_t, LtCstr, SplitRowDataAlloc > SplitRowDataMapT
Definition: CellList.h:66
BloomFilterMode m_bloom_filter_mode
Definition: CellStoreV5.h:156
virtual void clear()
Clears the contents of this trailer;.
BlockCompressionCodec::Args m_compressor_args
Definition: CellStoreV5.h:154
const ScanSpec * spec
Definition: ScanContext.h:55
uint32_t m_outstanding_appends
Definition: CellStoreV5.h:146
static uint64_t access_counter
Definition: Global.h:106
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
uint32_t size
The size of the allocated memory buffer (base)
Logging routines and macros.
KeyCompressorPtr m_key_compressor
Definition: CellStoreV5.h:162
BloomFilterMode
Enumeration for bloom filter modes.
CellListScannerPtr create_scanner(ScanContext *scan_ctx) override
Creates a scanner on this cell list.
Definition: CellStoreV5.cc:116
Compatibility Macros for C/C++.
uint16_t block_header_format() override
void insert(const void *key, size_t len)
Inserts a new blob into the hash.
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
#define HT_END
Definition: Logger.h:220
static const int64_t TIMESTAMP_NULL
Definition: KeySpec.h:36
CellStoreBlockIndexArray< uint32_t > m_index_map32
Definition: CellStoreV5.h:138
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
#define HT_ERROR_OUT
Definition: Logger.h:301
void open(const String &fname, const String &start_row, const String &end_row, int32_t fd, int64_t file_length, CellStoreTrailer *trailer) override
Opens a cell store with possibly a restricted view.
Definition: CellStoreV5.cc:826
Time related declarations.
size_t get_length_bits()
Getter for the number of bits.
bool own
If true then the buffer (base) will be released when going out of scope; if false then the caller has...
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
BlobHashSet BloomFilterItems
Definition: CellStoreV5.h:132
Hypertable definitions
void split_row_estimate_data(SplitRowDataMapT &split_row_data) override
Populates split_row_data with unique row and count estimates from block index.
Definition: CellStoreV5.cc:102
KeyDecompressor * create_key_decompressor() override
Creates a key decompressor suitable for decompressing the keys stored in this cell store...
Definition: CellStoreV5.cc:98
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void load(DynamicBuffer &fixed, DynamicBuffer &variable, int64_t end_of_data, const String &start_row="", const String &end_row="")
void clear()
Clears the buffer.
std::vector< String > m_replaced_files
Definition: CellStore.h:333
Declarations for BlockHeaderCellStore.
virtual void write(uint8_t *buf) const
Writes the compressed string to a buffer.
BloomFilterWithChecksum * m_bloom_filter
Definition: CellStoreV5.h:157
A class to decompress prefix-compressed strings.
void validate(String &filename)
Validates the checksum of the BloomFilter.
Declarations for Protocol.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
virtual size_t length() const
Retrieves the length of the compressed string.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
BlockCompressionCodec * m_compressor
Definition: CellStoreV5.h:142
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
virtual void deflate(const DynamicBuffer &input, DynamicBuffer &output, BlockHeader &header, size_t reserve=0)=0
Compresses a buffer.
static const char DATA_BLOCK_MAGIC[10]
Definition: CellStore.h:325
Request/response message event.
Definition: Event.h:63
void subtract(int64_t amount)
Subtract to memory used.
Definition: MemoryTracker.h:60
virtual void add(const char *str)
Adds (and compresses) a string.
This is a generic exception class for Hypertable.
Definition: Error.h:314
uint64_t purge_indexes() override
Purges bloom filter and block indexes.
Definition: CellStoreV5.cc:385
int64_t revision
Definition: Key.h:135
uint8_t * release(size_t *lenp=0)
Moves ownership of the buffer to the caller.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
std::shared_ptr< CellListScanner > CellListScannerPtr
Definition: CellList.h:35
virtual void open(const String &name, uint32_t flags, DispatchHandler *handler)=0
Opens a file asynchronously.
Configuration settings.
std::vector< ColumnFamilySpec * > ColumnFamilySpecs
Vector of ColumnFamilySpec pointers.
uint8_t flag
Definition: Key.h:125
IndexBuilder m_index_builder
Definition: CellStoreV5.h:144
CellStoreBlockIndexArray< int64_t > m_index_map64
Definition: CellStoreV5.h:139
A class to prefix-compress strings.
BasicBloomFilterWithChecksum BloomFilterWithChecksum
#define HT_WARN(msg)
Definition: Logger.h:289
int64_t m_uncompressed_blocksize
Definition: CellStoreV5.h:153
virtual void inflate(const DynamicBuffer &input, DynamicBuffer &output, BlockHeader &header)=0
Decompresses a buffer.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43
#define HT_FATAL_OUT
Definition: Logger.h:347
void add(int64_t amount)
Add to memory used.
Definition: MemoryTracker.h:53
bool may_contain(const void *key, size_t len) const
Checks if the data set "may" contain the key.
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
A class to prefix-compress strings.
void unique_row_count_estimate(CellList::SplitRowDataMapT &split_row_data, int32_t keys_per_block)
Accumulates unique row estimates from block index entries.
virtual void create(const String &name, uint32_t flags, int32_t bufsz, int32_t replication, int64_t blksz, DispatchHandler *handler)=0
Creates a file asynchronously.
#define HT_DEBUG_OUT
Definition: Logger.h:261
uint8_t * base()
Getter for the serialized bloom filter data, including metadata and checksums.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
#define HT_DIRECT_IO_ALIGNMENT
Definition: Filesystem.h:49
Abstract base class for block compression codecs.
Abstract base class for a filesystem.
Definition: Filesystem.h:72
int code() const
Returns the error code.
Definition: Error.h:391
void reserve(size_t len, bool nocopy=false)
Reserve space for additional data Will grow the space to exactly what's needed.
Definition: DynamicBuffer.h:95
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484