0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CellStoreV7.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include "Common/Compat.h"
29 #include <cassert>
30 
31 #include <boost/algorithm/string.hpp>
32 #include <boost/scoped_array.hpp>
33 
34 #include "Common/Config.h"
35 #include "Common/Error.h"
36 #include "Common/Logger.h"
37 #include "Common/System.h"
40 #include "Common/Time.h"
41 
42 #include "AsyncComm/Protocol.h"
43 
46 #include "Hypertable/Lib/Key.h"
47 #include "Hypertable/Lib/Schema.h"
48 
49 #include "CellStoreV7.h"
50 #include "CellStoreInfo.h"
51 #include "CellStoreTrailerV7.h"
52 #include "CellStoreScanner.h"
53 
54 #include "FileBlockCache.h"
55 #include "Global.h"
56 #include "Config.h"
57 #include "KeyCompressorPrefix.h"
58 #include "KeyDecompressorPrefix.h"
59 
60 using namespace std;
61 using namespace Hypertable;
62 
63 namespace {
64  const uint32_t MAX_APPENDS_OUTSTANDING = 3;
65  const uint16_t BLOCK_HEADER_VERSION = 1;
66 }
67 
68 
69 CellStoreV7::CellStoreV7(Filesystem *filesys)
70  : m_filesys(filesys) {
72  assert(sizeof(float) == 4);
73 }
74 
76  : m_filesys(filesys), m_schema(schema) {
78  assert(sizeof(float) == 4);
79 }
80 
82  try {
83  delete m_compressor;
84  delete m_bloom_filter;
85  delete m_bloom_filter_items;
86  if (m_fd != -1)
88  delete [] m_column_ttl;
89  }
90  catch (Exception &e) {
91  HT_ERROR_OUT << e << HT_END;
92  }
93 
95 
96 }
97 
98 
102 }
103 
105  return new KeyDecompressorPrefix();
106 }
107 
109  lock_guard<mutex> lock(m_mutex);
112  if (m_trailer.index_entries == 0) {
113  HT_WARNF("%s has 0 index entries", m_filename.c_str());
114  return;
115  }
116  int32_t keys_per_block = (int32_t)(m_trailer.total_entries / m_trailer.index_entries);
117  if (m_64bit_index)
118  m_index_map64.unique_row_count_estimate(split_row_data, keys_per_block);
119  else
120  m_index_map32.unique_row_count_estimate(split_row_data, keys_per_block);
121 }
122 
124  lock_guard<mutex> lock(m_mutex);
128  }
129  if (m_trailer.index_entries == 0) {
130  HT_WARNF("%s has 0 index entries", m_filename.c_str());
131  return;
132  }
133  int32_t keys_per_block = m_trailer.total_entries / m_trailer.index_entries;
134  if (m_64bit_index)
136  keys_per_block, m_trailer.compression_ratio);
137  else
139  keys_per_block, m_trailer.compression_ratio);
140 }
141 
142 
144  bool need_index = m_restricted_range || scan_ctx->restricted_range ||
145  scan_ctx->single_row || scan_ctx->has_cell_interval;
146 
147  if (need_index) {
148  lock_guard<mutex> lock(m_mutex);
153  }
154 
155  if (m_64bit_index)
156  return make_shared<CellStoreScanner<CellStoreBlockIndexArray<int64_t>>>(shared_from_this(), scan_ctx, need_index ? &m_index_map64 : 0);
157  return make_shared<CellStoreScanner<CellStoreBlockIndexArray<uint32_t>>>(shared_from_this(), scan_ctx, need_index ? &m_index_map32 : 0);
158 }
159 
160 namespace {
161  int get_replication(PropertiesPtr &props, const TableIdentifier *table_id) {
162 
163  int32_t replication = props->get_i32("replication", int32_t(-1));
164 
165  if (replication == -1 && table_id) {
166  if (table_id->is_user()) {
167  if (Config::has("Hypertable.RangeServer.Data.DefaultReplication"))
168  replication = Config::get_i32("Hypertable.RangeServer.Data.DefaultReplication");
169  }
170  else if (Config::has("Hypertable.Metadata.Replication"))
171  replication = Config::get_i32("Hypertable.Metadata.Replication");
172  }
173 
174  return replication;
175  }
176 }
177 
178 void
179 CellStoreV7::create(const char *fname, size_t max_entries,
180  PropertiesPtr &props, const TableIdentifier *table_id) {
181  int64_t blocksize = props->get("blocksize", 0);
182  String compressor = props->get("compressor", String());
183 
184  m_key_compressor = make_shared<KeyCompressorPrefix>();
185 
186  assert(Config::properties); // requires Config::init* first
187  int32_t replication = get_replication(props, table_id);
188 
189  if (blocksize == 0)
190  blocksize = Config::get_i32("Hypertable.RangeServer.CellStore"
191  ".DefaultBlockSize");
192  if (compressor.empty())
193  compressor = Config::get_str("Hypertable.RangeServer.CellStore"
194  ".DefaultCompressor");
195  if (!props->has("bloom-filter-mode")) {
196  // probably not called from AccessGroup
197  AccessGroupOptions::parse_bloom_filter(Config::get_str("Hypertable.RangeServer"
198  ".CellStore.DefaultBloomFilter"), props);
199  }
200 
201  m_buffer.reserve(blocksize*4);
202 
203  m_max_entries = max_entries;
204 
205  m_fd = -1;
206  m_offset = 0;
207 
209  m_index_builder.variable_buf().reserve(1024*1024);
210 
211  m_uncompressed_data = 0.0;
212  m_compressed_data = 0.0;
213 
214  m_trailer.clear();
215  m_trailer.blocksize = blocksize;
216  m_uncompressed_blocksize = blocksize;
217 
218  // set up the "column_ttl" vector
220  ColumnFamilySpecs &column_family_specs = m_schema->get_column_families();
221  for (size_t i=0; i<column_family_specs.size(); i++) {
222  if (column_family_specs[i]->get_option_ttl()) {
223  if (m_column_ttl == 0) {
224  m_column_ttl = new int64_t[256];
225  memset(m_column_ttl, 0, 256*8);
226  }
227  m_column_ttl[ column_family_specs[i]->get_id() ] = column_family_specs[i]->get_option_ttl() * 1000000000LL;
228  }
229  }
230 
231  m_filename = fname;
232 
233  m_start_row = "";
235 
237  compressor, m_compressor_args);
238 
242 
244  m_fd = m_filesys->create(m_filename, oflags, -1, replication, -1);
245 
246  m_bloom_filter_mode = props->get<BloomFilterMode>("bloom-filter-mode");
247  m_max_approx_items = props->get_i32("max-approx-items");
248 
250  bool has_num_hashes = props->has("num-hashes");
251  bool has_bits_per_item = props->has("bits-per-item");
252 
253  if (has_num_hashes || has_bits_per_item) {
254  if (!(has_num_hashes && has_bits_per_item)) {
255  HT_WARN("Bloom filter option --bits-per-item must be used with "
256  "--num-hashes, defaulting to false probability of 0.01");
258  }
259  else {
260  m_trailer.bloom_filter_hash_count = props->get_i32("num-hashes");
261  m_bloom_bits_per_item = props->get_f64("bits-per-item");
262  }
263  }
264  else
265  m_filter_false_positive_prob = props->get_f64("false-positive");
266  m_bloom_filter_items = new BloomFilterItems(); // aproximator items
267  }
268  HT_DEBUG_OUT <<"bloom-filter-mode="<< m_bloom_filter_mode
269  <<" max-approx-items="<< m_max_approx_items <<" false-positive="
271 }
272 
273 
274 void CellStoreV7::create_bloom_filter(bool is_approx) {
276 
277  HT_DEBUG_OUT << "Creating new BloomFilter for CellStore '"
278  << m_filename <<"' for "<< (is_approx ? "estimated " : "")
279  << m_trailer.filter_items_estimate << " items"<< HT_END;
280  try {
281  if (m_filter_false_positive_prob != 0.0)
284  else
288  }
289  catch(Exception &e) {
290  HT_FATAL_OUT << "Error creating new BloomFilter for CellStore '"
291  << m_filename <<"' for "<< (is_approx ? "estimated " : "")
292  << m_trailer.filter_items_estimate << " items - "<< e << HT_END;
293  }
294 
295  for (const auto &blob : *m_bloom_filter_items)
296  m_bloom_filter->insert(blob.start, blob.size);
297 
298  delete m_bloom_filter_items;
299  m_bloom_filter_items = 0;
300 
301  HT_DEBUG_OUT << "Created new BloomFilter for CellStore '"
302  << m_filename <<"'"<< HT_END;
303 }
304 
305 const std::vector<String> &CellStoreV7::get_replaced_files() {
306  lock_guard<mutex> lock(m_mutex);
309  return m_replaced_files;
310 }
311 
313  bool second_try = false;
314  int64_t amount = m_trailer.replaced_files_length;
315  int64_t len = 0;
316 
317  try_again:
318 
319  try {
320  DynamicBuffer buf(amount);
321 
323  len = m_filesys->pread(m_fd, buf.ptr, amount, m_trailer.replaced_files_offset, second_try);
324 
325  if (len != amount)
326  HT_THROWF(Error::FSBROKER_IO_ERROR, "Error loading replaced files for "
327  "CellStore '%s' : tried to read %lld but only got %lld",
328  m_filename.c_str(), (Lld)amount, (Lld)len);
331  StringDecompressorPrefix decompressor;
333  const uint8_t *ptr = buf.base;
334  for (uint32_t ii=0; ii < m_trailer.replaced_files_entries; ++ii) {
335  if (ptr - buf.base >= (ptrdiff_t) m_trailer.replaced_files_length)
337  "Bad replaced_files_offset in CellStore trailer fd=%u replaced_files_offset=%lld, "
338  "length=%llu, entries=%u, file='%s'", (unsigned)m_fd,
340  (unsigned)m_trailer.replaced_files_entries, m_filename.c_str());
341  ptr = decompressor.add(ptr);
342  decompressor.load(filename);
343  m_replaced_files.push_back(filename);
344  }
345  }
346  catch (Exception &e) {
347  String msg;
348  HT_ERROR_OUT << "pread(fd=" << m_fd << ", len=" << len << ", amount="
349  << amount << ")\n" << HT_END;
351  if (second_try)
352  HT_THROW2(e.code(), e, msg);
353  second_try = true;
354  goto try_again;
355  }
357 }
358 
360  size_t len;
361 
363 
364  HT_DEBUG_OUT << "Loading BloomFilter for CellStore '"
366  << " items"<< HT_END;
367  try {
372  }
373  catch(Exception &e) {
374  HT_FATAL_OUT << "Error loading BloomFilter for CellStore '"
376  << " items -"<< e << HT_END;
377  }
378 
379  if (m_bloom_filter->total_size() > 0) {
380 
381  bool second_try = false;
382 
383  while (true) {
384  try {
386  m_trailer.filter_offset, second_try);
387  }
388  catch (Exception &e) {
389  if (!second_try) {
390  second_try=true;
391  continue;
392  }
393  HT_THROW2(e.code(), e, format("Error loading BloomFilter for CellStore '%s'",
394  m_filename.c_str()));
395  }
396  break;
397  }
398 
399  if (len != m_bloom_filter->total_size())
400  HT_THROWF(Error::FSBROKER_IO_ERROR, "Problem loading bloomfilter for"
401  "CellStore '%s' : tried to read %lld but only got %lld",
402  m_filename.c_str(), (Lld)m_bloom_filter->total_size(), (Lld)len);
403 
404  m_bytes_read += len;
405 
407  }
408 
411 
412 }
413 
414 
415 
417  uint64_t memory_purged = 0;
418 
419  {
420  lock_guard<mutex> lock(m_mutex);
421 
423  memory_purged = m_index_stats.bloom_filter_memory;
424  delete m_bloom_filter;
425  m_bloom_filter = 0;
427  }
428 
430  memory_purged += m_index_stats.block_index_memory;
431  if (m_64bit_index)
433  else
436  }
437  }
438 
439  Global::memory_tracker->subtract( memory_purged );
440 
441  return memory_purged;
442 }
443 
444 
445 
446 void CellStoreV7::add(const Key &key, const ByteString value) {
447  EventPtr event_ptr;
448  DynamicBuffer zbuf;
449 
450  if (key.revision > m_trailer.revision)
452 
453  if (key.timestamp != TIMESTAMP_NULL) {
458  }
459 
460  if (m_buffer.fill() > (size_t)m_uncompressed_blocksize) {
461  BlockHeaderCellStore header(BLOCK_HEADER_VERSION, DATA_BLOCK_MAGIC);
462 
464 
465  m_uncompressed_data += (float)m_buffer.fill();
467  m_compressed_data += (float)zbuf.fill();
468  m_buffer.clear();
469 
470  uint64_t llval = ((uint64_t)m_trailer.blocksize
471  * (uint64_t)m_uncompressed_data) / (uint64_t)m_compressed_data;
472  m_uncompressed_blocksize = (int64_t)llval;
473 
474  if (m_outstanding_appends >= MAX_APPENDS_OUTSTANDING) {
475  if (!m_sync_handler.wait_for_reply(event_ptr)) {
476  if (event_ptr->type == Event::MESSAGE)
478  "Problem writing to FS file '%s' : %s", m_filename.c_str(),
480  HT_THROWF(event_ptr->error,
481  "Problem writing to FS file '%s'", m_filename.c_str());
482  }
484  }
485 
486  if (!HT_IO_ALIGNED(zbuf.fill())) {
487  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
488  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
489  }
490 
491  size_t zlen = zbuf.fill();
492  StaticBuffer send_buf(zbuf);
493 
495  catch (Exception &e) {
496  HT_THROW2F(e.code(), e, "Problem writing to FS file '%s'",
497  m_filename.c_str());
498  }
500  m_offset += zlen;
501  m_key_compressor->reset();
502  }
503 
504  m_key_compressor->add(key);
505 
506  size_t key_len = m_key_compressor->length();
507  size_t value_len = value.length();
508 
509  m_trailer.key_bytes += key.length;
510  m_trailer.value_bytes += value_len;
511 
512  if (m_column_ttl && m_column_ttl[key.column_family_code] != 0) {
513  m_trailer.expirable_data += key_len + value_len;
516  }
517 
518  if (key.flag <= FLAG_DELETE_CELL_VERSION)
520 
521  m_buffer.ensure(key_len + value_len);
522 
523  m_key_compressor->write(m_buffer.ptr);
524  m_buffer.ptr += key_len;
525 
526  m_buffer.add_unchecked(value.ptr, value_len);
527 
530  m_bloom_filter_items->insert(key.row, key.row_len);
531 
533  m_bloom_filter_items->insert(key.row, key.row_len + 2);
534 
536  m_trailer.filter_items_estimate = (size_t)(((double)m_max_entries
537  / (double)m_max_approx_items) * m_bloom_filter_items->size());
540  create_bloom_filter(true);
541  }
542  }
543  else {
544  assert(!m_bloom_filter_items && m_bloom_filter);
545 
546  m_bloom_filter->insert(key.row);
547 
549  m_bloom_filter->insert(key.row, key.row_len + 2);
550  }
551  }
552 
554 }
555 
556 
557 void CellStoreV7::finalize(TableIdentifier *table_identifier) {
558  EventPtr event_ptr;
559  size_t zlen;
560  DynamicBuffer zbuf(0);
561  SerializedKey key;
562  StaticBuffer send_buf;
563  int64_t index_memory = 0;
564 
565  if (m_buffer.fill() > 0) {
566  BlockHeaderCellStore header(BLOCK_HEADER_VERSION, DATA_BLOCK_MAGIC);
567 
569 
570  m_uncompressed_data += (float)m_buffer.fill();
572  m_compressed_data += (float)zbuf.fill();
573 
574  if (!HT_IO_ALIGNED(zbuf.fill())) {
575  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
576  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
577  }
578  zlen = zbuf.fill();
579  send_buf = zbuf;
580 
581  if (m_outstanding_appends >= MAX_APPENDS_OUTSTANDING) {
582  if (!m_sync_handler.wait_for_reply(event_ptr))
584  "Problem finalizing CellStore file '%s' : %s",
585  m_filename.c_str(),
586  Protocol::string_format_message(event_ptr).c_str());
588  }
589 
591 
593  m_offset += zlen;
594  }
595 
596  m_key_compressor = 0;
597 
598  m_buffer.free();
599 
601  if (m_uncompressed_data == 0)
603  else
605 
607 
612 
616  {
617  BlockHeaderCellStore header(BLOCK_HEADER_VERSION, INDEX_FIXED_BLOCK_MAGIC);
619  }
620 
621  if (!HT_IO_ALIGNED(zbuf.fill())) {
622  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
623  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
624  }
625  zlen = zbuf.fill();
626  send_buf = zbuf;
627 
629 
631  m_offset += zlen;
632 
636  {
637  BlockHeaderCellStore header(BLOCK_HEADER_VERSION, INDEX_VARIABLE_BLOCK_MAGIC);
640  }
641 
642  delete m_compressor;
643  m_compressor = 0;
644 
645  if (!HT_IO_ALIGNED(zbuf.fill())) {
646  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
647  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
648  }
649  zlen = zbuf.fill();
650  send_buf = zbuf;
651 
653 
655  m_offset += zlen;
656 
657  // write filter_offset
659 
660  // if bloom_items haven't been spilled to create a bloom filter yet, do it
663 
664  if (m_bloom_filter_items && m_bloom_filter_items->size() > 0) {
665  m_trailer.filter_items_estimate = m_bloom_filter_items->size();
667  }
668 
669  if (m_bloom_filter) {
674  m_bloom_filter->serialize(send_buf);
678  }
679  }
680 
681  // Write compressed replaced_file lists
682  // Coalesce with trailer block if possible
683  zbuf.clear();
684  size_t compressed_len = 0;
685  StringCompressorPrefix compressor;
686  bool coalesce_with_trailer =false;
687  for (size_t ii=0; ii < m_replaced_files.size();++ii) {
688  compressor.add(m_replaced_files[ii].c_str());
689  compressed_len += compressor.length();
690  }
691 
692  if (HT_IO_ALIGNMENT_PADDING(compressed_len) >= m_trailer.size()) {
693  coalesce_with_trailer = true;
694  zbuf.reserve(compressed_len + m_trailer.size() +
695  HT_IO_ALIGNMENT_PADDING(compressed_len+m_trailer.size()));
696  }
697  else
698  zbuf.reserve(compressed_len + HT_IO_ALIGNMENT_PADDING(compressed_len));
701  m_trailer.replaced_files_length = compressed_len;
702 
703  compressor.reset();
704  for (size_t ii=0; ii < m_replaced_files.size();++ii) {
705  compressor.add(m_replaced_files[ii].c_str());
706  compressor.write(zbuf.ptr);
707  zbuf.ptr += compressor.length();
708  }
709 
710  if (!coalesce_with_trailer) {
711  if (!HT_IO_ALIGNED(zbuf.fill())) {
712  memset(zbuf.ptr, 0, HT_IO_ALIGNMENT_PADDING(zbuf.fill()));
713  zbuf.ptr += HT_IO_ALIGNMENT_PADDING(zbuf.fill());
714  }
715  send_buf = zbuf;
718  zlen = zbuf.fill();
719  m_offset += zlen;
720  }
721 
723 
725  double fraction_covered;
726  if (m_64bit_index) {
731  index_memory = m_index_map64.memory_used();
734  fraction_covered = m_index_map64.fraction_covered();
736  }
737  else {
742  index_memory = m_index_map32.memory_used();
744  fraction_covered = m_index_map32.fraction_covered();
746  }
747 
748  // deallocate fix index data
750 
751  // Add table information
752  m_trailer.table_id = table_identifier->index();
753  m_trailer.table_generation = table_identifier->generation;
755 
756  m_trailer.block_header_version = BLOCK_HEADER_VERSION;
757 
758  // write trailer
759  if (!coalesce_with_trailer) {
760  zbuf.clear();
763  memset(zbuf.base, 0, HT_DIRECT_IO_ALIGNMENT);
764  zbuf.ptr = zbuf.base + (HT_DIRECT_IO_ALIGNMENT-m_trailer.size());
765  }
766  else {
768  memset(zbuf.ptr, 0, padding);
769  zbuf.ptr += padding;
770  }
771  m_trailer.serialize(zbuf.ptr);
772  zbuf.ptr += m_trailer.size();
773 
774  zlen = zbuf.fill();
775  send_buf = zbuf;
776 
777  m_filesys->append(m_fd, send_buf);
778 
780  m_offset += zlen;
781 
783  m_filesys->close(m_fd);
784 
787 
788  m_disk_usage +=
789  (int64_t)((double)(m_offset-m_trailer.fix_index_offset) * fraction_covered);
790 
793 
794  m_index_stats.block_index_memory = index_memory;
795 
796  if (m_bloom_filter)
798 
799  delete [] m_column_ttl;
800  m_column_ttl = 0;
801 
803 }
804 
805 
807  int64_t offset) {
808 
809  // switch to 64-bit offsets if offset being added is >= 2^32
810  if (!m_bigint && offset >= 4294967296LL) {
811  DynamicBuffer tmp_buf(m_fixed.size*2);
812  const uint8_t *src = m_fixed.base;
813  uint8_t *dst = tmp_buf.base;
814  size_t remaining = m_fixed.fill();
815  while (src < m_fixed.ptr)
816  Serialization::encode_i64(&dst, (uint64_t)Serialization::decode_i32(&src, &remaining));
817  delete [] m_fixed.release();
818  m_fixed.base = tmp_buf.base;
819  m_fixed.ptr = dst;
820  m_fixed.size = tmp_buf.size;
821  m_fixed.own = true;
822  tmp_buf.release();
823  m_bigint = true;
824  }
825 
826  // Add key to variable buffer
827  size_t key_len = key_compressor->length_uncompressed();
828  m_variable.ensure(key_len);
829  key_compressor->write_uncompressed(m_variable.ptr);
830  m_variable.ptr += key_len;
831 
832  // Serialize offset into fix index buffer
833  if (m_bigint) {
834  m_fixed.ensure(8);
835  memcpy(m_fixed.ptr, &offset, 8);
836  m_fixed.ptr += 8;
837  }
838  else {
839  m_fixed.ensure(4);
840  memcpy(m_fixed.ptr, &offset, 4);
841  m_fixed.ptr += 4;
842  }
843 }
844 
845 
847  uint8_t *base;
848  size_t len;
849 
850  base = m_fixed.release(&len);
851  m_fixed.reserve(len);
852  m_fixed.add_unchecked(base, len);
853  delete [] base;
854 
855  base = m_variable.release(&len);
856  m_variable.reserve(len);
857  m_variable.add_unchecked(base, len);
858  delete [] base;
859 }
860 
861 
862 
863 void
864 CellStoreV7::open(const String &fname, const String &start_row,
865  const String &end_row, int32_t fd, int64_t file_length,
866  CellStoreTrailer *trailer) {
867  m_filename = fname;
868  m_start_row = start_row;
869  m_end_row = end_row;
870  m_fd = fd;
871  m_file_length = file_length;
872 
874 
875  m_trailer = *static_cast<CellStoreTrailerV7 *>(trailer);
876 
878 
881 
883  m_64bit_index = true;
884 
888  "Bad index offsets in CellStore trailer fd=%u fix=%lld, var=%lld, "
889  "length=%llu, file='%s'", (unsigned)m_fd, (Lld)m_trailer.fix_index_offset,
890  (Lld)m_trailer.var_index_offset, (Llu)m_file_length, fname.c_str());
891 
892  // This is necessary to get m_disk_usage and m_block_count set properly
894 
895  Global::memory_tracker->add( sizeof(CellStoreV7) + sizeof(CellStoreInfo) );
896 
897 }
898 
899 
900 
901 void
902 CellStoreV7::rescope(const String &start_row, const String &end_row) {
903  lock_guard<mutex> lock(m_mutex);
904  HT_ASSERT(m_start_row.compare(start_row)<0 || m_end_row.compare(end_row)>0);
905  m_start_row = start_row;
906  m_end_row = end_row;
907  m_restricted_range = true;
910  if (m_64bit_index) {
911  m_index_map64.rescope(m_start_row, m_end_row);
914  (int64_t)((double)(m_file_length-m_trailer.fix_index_offset) *
917  }
918  else {
919  m_index_map32.rescope(m_start_row, m_end_row);
922  (int64_t)((double)(m_file_length-m_trailer.fix_index_offset) *
925  }
927  }
928  else
930 }
931 
932 
933 
935  int64_t amount, index_amount;
936  int64_t len = 0;
937  BlockHeaderCellStore header(BLOCK_HEADER_VERSION);
938  SerializedKey key;
939  bool inflating_fixed=true;
940  bool second_try = false;
941 
943 
944  unique_ptr<BlockCompressionCodec> compressor(create_block_compression_codec());
945 
946  amount = index_amount = m_trailer.filter_offset - m_trailer.fix_index_offset;
947 
948  try_again:
949 
950  try {
951  DynamicBuffer buf(amount);
952 
954  len = m_filesys->pread(m_fd, buf.ptr, amount, m_trailer.fix_index_offset, second_try);
955 
956  if (len != amount)
957  HT_THROWF(Error::FSBROKER_IO_ERROR, "Error loading index for "
958  "CellStore '%s' : tried to read %lld but only got %lld",
959  m_filename.c_str(), (Lld)amount, (Lld)len);
962  compressor->inflate(buf, m_index_builder.fixed_buf(), header);
963 
965 
966  inflating_fixed = false;
967 
970 
972  DynamicBuffer vbuf(0, false);
974  vbuf.base = buf.ptr;
975  vbuf.ptr = buf.ptr + amount;
976 
977  compressor->inflate(vbuf, m_index_builder.variable_buf(), header);
978 
980 
983  }
984  catch (Exception &e) {
985  String msg;
986  if (inflating_fixed) {
987  msg = String("Error inflating FIXED index for cellstore '")
988  + m_filename + "'";
989  HT_ERROR_OUT << msg << ": "<< e << HT_END;
990  }
991  else {
992  msg = "Error inflating VARIABLE index for cellstore '" + m_filename + "'";
993  HT_ERROR_OUT << msg << ": " << e << HT_END;
994  }
995  HT_ERROR_OUT << "pread(fd=" << m_fd << ", len=" << len << ", amount="
996  << index_amount << ")\n" << HT_END;
998  if (second_try)
999  HT_THROW2(e.code(), e, msg);
1000  second_try = true;
1001  goto try_again;
1002  }
1003 
1005  if (m_64bit_index) {
1011  (int64_t)((double)(m_file_length-m_trailer.fix_index_offset) *
1014  }
1015  else {
1021  (int64_t)((double)(m_file_length-m_trailer.fix_index_offset) *
1024  }
1025 
1027 
1029 }
1030 
1031 
1033 
1035  return true;
1036  else if (m_trailer.filter_length == 0) // bloom filter is empty
1037  return false;
1038 
1039  {
1040  lock_guard<mutex> lock(m_mutex);
1041  if (m_bloom_filter == 0)
1043 
1045 
1046  switch (m_bloom_filter_mode) {
1047  case BLOOM_FILTER_ROWS:
1049  return m_bloom_filter->may_contain(scan_ctx->start_row.data(),
1050  scan_ctx->start_row.size());
1053  if (m_bloom_filter->may_contain(scan_ctx->start_row.data(),
1054  scan_ctx->start_row.size())) {
1055  SchemaPtr &schema = scan_ctx->schema;
1056  size_t rowlen = scan_ctx->start_row.length();
1057  uint8_t column_family_id;
1058  const char *ptr;
1059  boost::scoped_array<char> rowcol(new char[rowlen + 2]);
1060  memcpy(rowcol.get(), scan_ctx->start_row.c_str(), rowlen + 1);
1061 
1062  for (auto col : scan_ctx->spec->columns) {
1063  if ((ptr = strchr(col, ':')) != 0) {
1064  String family(col, (size_t)(ptr-col));
1065  column_family_id = schema->get_column_family(family.c_str())->get_id();
1066  }
1067  else
1068  column_family_id = schema->get_column_family(col)->get_id();
1069 
1070  rowcol[rowlen + 1] = column_family_id;
1071 
1073  if (m_bloom_filter->may_contain(rowcol.get(), rowlen + 2))
1074  return true;
1075  }
1076  }
1077  return false;
1078  default:
1079  HT_ASSERT(!"unpossible bloom filter mode!");
1080  }
1081  }
1082  return false; // silence stupid compilers
1083 }
1084 
1085 
1086 
1088  lock_guard<mutex> lock(m_mutex);
1090  load_block_index();
1091  if (m_64bit_index)
1093  else
1095 }
1096 
1097 
1099  return BLOCK_HEADER_VERSION;
1100 }
size_t get_num_hashes()
Getter for the number of hash functions.
void free()
Frees resources.
size_t get_items_actual()
Getter for the actual number of items.
static const char INDEX_FIXED_BLOCK_MAGIC[10]
Definition: CellStore.h:326
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
A memory buffer of static size.
Definition: StaticBuffer.h:45
Retrieves system information (hardware, installation directory, etc)
virtual void clear()
Clears the contents of this trailer;.
virtual void append(int fd, StaticBuffer &buffer, Flags flags, DispatchHandler *handler)=0
Appends data to a file asynchronously.
Abstract base class for cell store trailer.
uint32_t m_outstanding_appends
Definition: CellStoreV7.h:171
virtual void close(int fd, DispatchHandler *handler)=0
Closes a file asynchronously.
int64_t timestamp
Definition: Key.h:134
#define HT_IO_ALIGNMENT_PADDING(size)
Definition: Filesystem.h:54
Cell list scanner over a buffer of cells.
const char * row
Definition: Key.h:129
#define HT_WARNF(msg,...)
Definition: Logger.h:290
virtual void pread(int fd, size_t amount, uint64_t offset, bool verify_checksum, DispatchHandler *handler)=0
Reads data from a file at the specified position asynchronously.
CellStoreTrailerV7 m_trailer
Definition: CellStoreV7.h:166
static String filename
Definition: Config.cc:48
static const char INDEX_VARIABLE_BLOCK_MAGIC[10]
Definition: CellStore.h:327
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
Declarations for CellStoreV7.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
#define HT_IO_ALIGNED(size)
Definition: Filesystem.h:51
uint32_t length
Definition: Key.h:124
CellStoreV7(Filesystem *filesys)
Definition: CellStoreV7.cc:69
bool check_magic(const char *magic)
Compares a given character sequence with the magic field.
Definition: BlockHeader.h:86
void display_block_info() override
Displays block information to stdout.
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
std::shared_ptr< KeyCompressor > KeyCompressorPtr
Definition: KeyCompressor.h:45
uint16_t block_header_format() override
A class to decompress prefix-compressed strings.
std::string m_start_row
Definition: CellList.h:99
DynamicBuffer m_buffer
Definition: CellStoreV7.h:168
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
BlockCompressionCodec * m_compressor
Definition: CellStoreV7.h:167
Declarations for CellStoreScanner.
static void parse_bloom_filter(const std::string &spec, PropertiesPtr &props)
Parsers a bloom filter specification and sets properties.
BloomFilterMode m_bloom_filter_mode
Definition: CellStoreV7.h:181
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
BlobHashSet BloomFilterItems
Definition: CellStoreV7.h:159
STL namespace.
BlockCompressionCodec * create_block_compression_codec() override
Creates a block compression codec suitable for decompressing the cell store's blocks.
Definition: CellStoreV7.cc:99
Scan context information.
Definition: ScanContext.h:52
uint32_t m_index_refcount
Definition: CellStore.h:336
std::string m_end_row
Definition: CellList.h:100
KeyDecompressor * create_key_decompressor() override
Creates a key decompressor suitable for decompressing the keys stored in this cell store...
Definition: CellStoreV7.cc:104
Type
Enumeration for compression type.
virtual void serialize(uint8_t *buf)
Serializes this trailer to the given buffer;.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
virtual void reset()
Clears the internal state.
void add_disk_read(int64_t amount)
Declarations for Schema.
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
uint32_t row_len
Definition: Key.h:131
void serialize(StaticBuffer &buf)
Serializes the BloomFilter into a static memory buffer.
IndexMemoryStats m_index_stats
Definition: CellStore.h:332
static BlockCompressionCodec::Type parse_block_codec_spec(const std::string &spec, BlockCompressionCodec::Args &args)
Given a block codec config string return its type and put config.
CellStoreBlockIndexArray< uint32_t > m_index_map32
32-bit block index
Definition: CellStoreV7.h:197
size_t total_size()
Getter for the total size (including checksum and metadata)
static Hypertable::MemoryTracker * memory_tracker
Definition: Global.h:94
Filesystem * m_filesys
Definition: CellStoreV7.h:161
std::map< const char *, int64_t, LtCstr, SplitRowDataAlloc > SplitRowDataMapT
Definition: CellList.h:66
int64_t m_uncompressed_blocksize
Definition: CellStoreV7.h:178
const ScanSpec * spec
Definition: ScanContext.h:55
static uint64_t access_counter
Definition: Global.h:106
void create(const char *fname, size_t max_entries, PropertiesPtr &props, const TableIdentifier *table_id=0) override
Creates a new cell store.
Definition: CellStoreV7.cc:179
DispatchHandlerSynchronizer m_sync_handler
Definition: CellStoreV7.h:170
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
uint32_t size
The size of the allocated memory buffer (base)
Logging routines and macros.
BloomFilterMode
Enumeration for bloom filter modes.
bool may_contain(ScanContext *scan_ctx) override
Bloom filter lookup.
Compatibility Macros for C/C++.
std::mutex m_mutex
Definition: CellStore.h:331
void insert(const void *key, size_t len)
Inserts a new blob into the hash.
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
#define HT_END
Definition: Logger.h:220
static const int64_t TIMESTAMP_NULL
Definition: KeySpec.h:36
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
#define HT_ERROR_OUT
Definition: Logger.h:301
Time related declarations.
CellStoreBlockIndexArray< int64_t > m_index_map64
64-bit block index
Definition: CellStoreV7.h:200
size_t get_length_bits()
Getter for the number of bits.
void finalize(TableIdentifier *table_identifier) override
Finalizes the creation of a cell store, by writing block index and metadata trailer.
Definition: CellStoreV7.cc:557
BloomFilterItems * m_bloom_filter_items
Definition: CellStoreV7.h:182
void add(const Key &key, const ByteString value) override
Inserts a key/value pair into the cell list.
Definition: CellStoreV7.cc:446
bool own
If true then the buffer (base) will be released when going out of scope; if false then the caller has...
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void load(DynamicBuffer &fixed, DynamicBuffer &variable, int64_t end_of_data, const String &start_row="", const String &end_row="")
void clear()
Clears the buffer.
std::vector< String > m_replaced_files
Definition: CellStore.h:333
Declarations for BlockHeaderCellStore.
virtual void write(uint8_t *buf) const
Writes the compressed string to a buffer.
A class to decompress prefix-compressed strings.
void validate(String &filename)
Validates the checksum of the BloomFilter.
Represents the trailer for CellStore version 7.
Declarations for Protocol.
virtual size_t length() const
Retrieves the length of the compressed string.
uint64_t purge_indexes() override
Purges bloom filter and block indexes.
Definition: CellStoreV7.cc:416
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
Provides access to internal components of opaque key.
Definition: Key.h:40
void open(const String &fname, const String &start_row, const String &end_row, int32_t fd, int64_t file_length, CellStoreTrailer *trailer) override
Opens a cell store with possibly a restricted view.
Definition: CellStoreV7.cc:864
uint8_t * base
Pointer to the allocated memory buffer.
BlockCompressionCodec::Args m_compressor_args
Definition: CellStoreV7.h:179
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
virtual void deflate(const DynamicBuffer &input, DynamicBuffer &output, BlockHeader &header, size_t reserve=0)=0
Compresses a buffer.
static const char DATA_BLOCK_MAGIC[10]
Definition: CellStore.h:325
Request/response message event.
Definition: Event.h:63
void subtract(int64_t amount)
Subtract to memory used.
Definition: MemoryTracker.h:60
virtual void add(const char *str)
Adds (and compresses) a string.
This is a generic exception class for Hypertable.
Definition: Error.h:314
void split_row_estimate_data(SplitRowDataMapT &split_row_data) override
Populates split_row_data with unique row and count estimates from block index.
Definition: CellStoreV7.cc:108
void create_bloom_filter(bool is_approx=false)
Definition: CellStoreV7.cc:274
Declarations for CellStoreTrailerV7.
int64_t revision
Definition: Key.h:135
void populate_pseudo_table_scanner(CellListScannerBuffer *scanner, const String &filename, int32_t keys_per_block, float compression_ratio)
Populates scanner with data for .cellstore.index pseudo table.
uint8_t * release(size_t *lenp=0)
Moves ownership of the buffer to the caller.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
std::shared_ptr< CellListScanner > CellListScannerPtr
Definition: CellList.h:35
virtual void open(const String &name, uint32_t flags, DispatchHandler *handler)=0
Opens a file asynchronously.
Configuration settings.
KeyCompressorPtr m_key_compressor
Definition: CellStoreV7.h:186
std::vector< ColumnFamilySpec * > ColumnFamilySpecs
Vector of ColumnFamilySpec pointers.
uint8_t flag
Definition: Key.h:125
void populate_index_pseudo_table_scanner(CellListScannerBuffer *scanner) override
Populates scanner with key/value pairs generated from CellStore index.
Definition: CellStoreV7.cc:123
A class to prefix-compress strings.
void rescope(const String &start_row="", const String &end_row="")
virtual size_t size()
Returns the serialized size of the trailer.
BasicBloomFilterWithChecksum BloomFilterWithChecksum
#define HT_WARN(msg)
Definition: Logger.h:289
virtual void inflate(const DynamicBuffer &input, DynamicBuffer &output, BlockHeader &header)=0
Decompresses a buffer.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const char * END_ROW_MARKER
Definition: Key.h:49
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43
#define HT_FATAL_OUT
Definition: Logger.h:347
void add(int64_t amount)
Add to memory used.
Definition: MemoryTracker.h:53
bool may_contain(const void *key, size_t len) const
Checks if the data set "may" contain the key.
IndexBuilder m_index_builder
Definition: CellStoreV7.h:169
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
BloomFilterWithChecksum * m_bloom_filter
Bloom filter.
Definition: CellStoreV7.h:194
A class to prefix-compress strings.
void unique_row_count_estimate(CellList::SplitRowDataMapT &split_row_data, int32_t keys_per_block)
Accumulates unique row estimates from block index entries.
CellListScannerPtr create_scanner(ScanContext *scan_ctx) override
Creates a scanner on this cell list.
Definition: CellStoreV7.cc:143
virtual void create(const String &name, uint32_t flags, int32_t bufsz, int32_t replication, int64_t blksz, DispatchHandler *handler)=0
Creates a file asynchronously.
#define HT_DEBUG_OUT
Definition: Logger.h:261
uint8_t * base()
Getter for the serialized bloom filter data, including metadata and checksums.
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
#define HT_DIRECT_IO_ALIGNMENT
Definition: Filesystem.h:49
Abstract base class for block compression codecs.
Abstract base class for a filesystem.
Definition: Filesystem.h:72
int code() const
Returns the error code.
Definition: Error.h:391
void reserve(size_t len, bool nocopy=false)
Reserve space for additional data Will grow the space to exactly what's needed.
Definition: DynamicBuffer.h:95
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484
void add_entry(KeyCompressorPtr &key_compressor, int64_t offset)
Definition: CellStoreV7.cc:806
void rescope(const String &start_row, const String &end_row) override
Definition: CellStoreV7.cc:902