0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
csvalidate.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
30 
33 #include <Hypertable/Lib/Key.h>
35 
36 #include <FsBroker/Lib/Client.h>
37 
38 #include <AsyncComm/Comm.h>
41 
43 #include <Common/ByteString.h>
44 #include <Common/Checksum.h>
45 #include <Common/InetAddr.h>
46 #include <Common/Init.h>
47 #include <Common/Logger.h>
48 #include <Common/Serialization.h>
49 #include <Common/System.h>
50 #include <Common/Usage.h>
51 
52 #include <boost/algorithm/string.hpp>
53 #include <boost/any.hpp>
54 
55 #include <iostream>
56 #include <string>
57 #include <vector>
58 
59 using namespace Hypertable;
60 using namespace Config;
61 using namespace std;
62 
63 namespace {
64 
65  struct AppPolicy : Config::Policy {
66  static void init_options() {
67  cmdline_desc("Usage: %s [options] <filename>\n\n"
68  "Dumps the contents of the CellStore contained in the FS <filename>."
69  "\n\nOptions").add_options()
70  ("repair", "Repair any corruption that is found")
71  ;
72  cmdline_hidden_desc().add_options()("filename", str(), "");
73  cmdline_positional_desc().add("filename", -1);
74  }
75  static void init() {
76  if (!has("filename")) {
77  HT_ERROR_OUT <<"filename required" << HT_END;
78  cout << cmdline_desc() << endl;
79  exit(EXIT_FAILURE);
80  }
81  }
82  };
83 
84  typedef Meta::list<AppPolicy, FsClientPolicy, DefaultCommPolicy> Policies;
85 
86  class BlockEntry {
87  public:
88  BlockEntry() : sequence(0), offset(0), rowkey(0), matched(false), key_mismatch(false) { }
89  size_t sequence;
90  int64_t offset;
91  char *rowkey;
92  bool matched;
93  bool key_mismatch;
94  };
95 
96  class State {
97  public:
98  State() : block_index_is_bad(false), bloom_filter_is_bad(false) { }
99  String fname;
100  uint8_t *base;
101  uint8_t *end;
102  CellStoreTrailer *trailer;
103  vector<BlockEntry> index_block_info;
104  vector<BlockEntry> reconstructed_block_info;
105  BloomFilterWithChecksum *bloom_filter;
106  BlockCompressionCodec *compressor;
107  KeyDecompressor *key_decompressor;
108  bool block_index_is_bad;
109  bool bloom_filter_is_bad;
110  uint16_t block_header_format;
111  };
112 
113  //const char DATA_BLOCK_MAGIC[10]={ 'D','a','t','a','-','-','-','-','-','-' };
114 
115  const char INDEX_FIXED_BLOCK_MAGIC[10] =
116  { 'I','d','x','F','i','x','-','-','-','-' };
117  const char INDEX_VARIABLE_BLOCK_MAGIC[10] =
118  { 'I','d','x','V','a','r','-','-','-','-' };
119 
120  void load_file(const String &fname, State &state) {
121  int64_t length = Global::dfs->length(fname.c_str());
122  uint8_t *base = new uint8_t[length];
123  const uint8_t *ptr = base;
124  uint8_t *end = base + length;
125  int fd;
126  int64_t nleft = length;
127  size_t nread, toread;
128  uint16_t version;
129  size_t remaining;
130 
131  fd = Global::dfs->open_buffered(fname.c_str(), 0, 1024*1024, 5);
132  while (nleft > 0) {
133  toread = (nleft > 1024*1024) ? 1024*1024 : nleft;
134  nread = Global::dfs->read(fd, (uint8_t *)ptr, toread);
135  nleft -= nread;
136  ptr += nread;
137  }
138 
139  // Read version
140  ptr = end-2;
141  remaining = 2;
142  version = Serialization::decode_i16(&ptr, &remaining);
143 
144  if (version == 6) {
145  state.trailer = new CellStoreTrailerV7();
146  state.block_header_format = 0; // hack
147  }
148  else {
149  cout << "unsupported CellStore version (" << version << ")" << endl;
150  quick_exit(EXIT_FAILURE);
151  }
152 
153  state.trailer->deserialize(end - state.trailer->size());
154  state.base = base;
155  state.end = end;
156 
157  uint16_t compression_type = boost::any_cast<uint16_t>(state.trailer->get("compression_type"));
158  state.compressor = CompressorFactory::create_block_codec((BlockCompressionCodec::Type)compression_type);
159  state.key_decompressor = new KeyDecompressorPrefix();
160  }
161 
162 
163  void read_block_index(State &state) {
164  uint32_t flags = boost::any_cast<uint32_t>(state.trailer->get("flags"));
165  int64_t fix_index_offset = boost::any_cast<int64_t>(state.trailer->get("fix_index_offset"));
166  int64_t var_index_offset = boost::any_cast<int64_t>(state.trailer->get("var_index_offset"));
167  int64_t filter_offset = boost::any_cast<int64_t>(state.trailer->get("filter_offset"));
168 
169  BlockHeaderCellStore header(state.block_header_format);
170 
171  DynamicBuffer input_buf(0, false);
172  DynamicBuffer output_buf(0, false);
174 
175  // FIXED
176  input_buf.base = state.base + fix_index_offset;
177  input_buf.ptr = input_buf.base + (var_index_offset - fix_index_offset);
178 
179  state.compressor->inflate(input_buf, output_buf, header);
180 
181  if (!header.check_magic(INDEX_FIXED_BLOCK_MAGIC)) {
182  cout << "corrupt fixed index" << endl;
183  quick_exit(EXIT_FAILURE);
184  }
185 
186  int64_t index_entries = output_buf.fill() / (bits64 ? 8 : 4);
187 
188  state.index_block_info.reserve(index_entries);
189 
190  BlockEntry be;
191  const uint8_t *ptr = (const uint8_t *)output_buf.base;
192  size_t remaining = output_buf.fill();
193  for (int64_t i=0; i<index_entries; i++) {
194  if (bits64)
195  be.offset = Serialization::decode_i64(&ptr, &remaining);
196  else
197  be.offset = Serialization::decode_i32(&ptr, &remaining);
198  be.sequence = (size_t)i;
199  state.index_block_info.push_back(be);
200  }
201 
202  // VARIABLE
203  input_buf.base = state.base + var_index_offset;
204  input_buf.ptr = input_buf.base + (filter_offset - var_index_offset);
205  output_buf.clear();
206 
207  state.compressor->inflate(input_buf, output_buf, header);
208 
209  if (!header.check_magic(INDEX_VARIABLE_BLOCK_MAGIC)) {
210  cout << "corrupt variable index" << endl;
211  quick_exit(EXIT_FAILURE);
212  }
213 
214  SerializedKey key;
215 
216  ptr = output_buf.base;
217 
218  for (size_t i=0; i<state.index_block_info.size(); i++) {
219  key.ptr = ptr;
220  ptr += key.length();
221  state.index_block_info[i].rowkey = (char *)key.row();
222  }
223 
224  }
225 
226  void read_bloom_filter(State &state) {
227  int64_t filter_offset = boost::any_cast<int64_t>(state.trailer->get("filter_offset"));
228  int64_t filter_length = boost::any_cast<int64_t>(state.trailer->get("filter_length"));
229  int64_t filter_items_actual = boost::any_cast<int64_t>(state.trailer->get("filter_items_actual"));
230  uint8_t bloom_filter_hash_count = boost::any_cast<uint8_t>(state.trailer->get("bloom_filter_hash_count"));
231  uint8_t bloom_filter_mode = boost::any_cast<uint8_t>(state.trailer->get("bloom_filter_mode"));
232 
233  if ((BloomFilterMode)bloom_filter_mode == BLOOM_FILTER_DISABLED) {
234  state.bloom_filter = 0;
235  return;
236  }
237 
238  if ((BloomFilterMode)bloom_filter_mode == BLOOM_FILTER_ROWS_COLS) {
239  cout << "Unsupported bloom filter type (BLOOM_FILTER_ROWS_COLS)" << endl;
240  quick_exit(EXIT_FAILURE);
241  }
242 
243  HT_ASSERT((BloomFilterMode)bloom_filter_mode == BLOOM_FILTER_ROWS);
244 
245  state.bloom_filter = new BloomFilterWithChecksum(filter_items_actual, filter_items_actual,
246  filter_length, bloom_filter_hash_count);
247  memcpy(state.bloom_filter->base(), state.base+filter_offset, state.bloom_filter->total_size());
248  try {
249  state.bloom_filter->validate(state.fname);
250  }
251  catch (Exception &e) {
252  state.bloom_filter_is_bad = true;
253  state.bloom_filter = 0;
254  }
255 
256  }
257 
258  template < class Operator >
259  bool process_blocks (State &state, Operator op) {
260  BlockHeaderCellStore header(state.block_header_format);
261  int64_t offset = 0;
262  int64_t end_offset = boost::any_cast<int64_t>(state.trailer->get("fix_index_offset"));
263  uint32_t alignment = boost::any_cast<uint32_t>(state.trailer->get("alignment"));
264  const uint8_t *ptr = state.base;
265  const uint8_t *end = state.base + end_offset;
266  size_t remaining;
267  size_t sequence = 0;
268  DynamicBuffer expand_buf(0);
269  DynamicBuffer input_buf(0, false);
270 
271  while (ptr < end) {
272 
273  if ((end-ptr) < (ptrdiff_t)header.encoded_length())
274  return false;
275 
276  offset = ptr - state.base;
277 
278  // decode header
279  remaining = end - ptr;
280  input_buf.base = input_buf.ptr = (uint8_t *)ptr;
281  input_buf.size = remaining;
282  header.decode((const uint8_t **)&input_buf.ptr, &remaining);
283 
284  size_t extra = 0;
285  if (alignment > 0) {
286  if ((header.encoded_length()+header.get_data_zlength())%alignment)
287  extra = alignment - ((header.encoded_length()+header.get_data_zlength())%alignment);
288  }
289 
290  // make sure we don't run off the end
291  if (input_buf.ptr + header.get_data_zlength() + extra > end)
292  return false;
293 
294  // Inflate block
295  input_buf.ptr += header.get_data_zlength() + extra;
296  state.compressor->inflate(input_buf, expand_buf, header);
297 
298  // call functor
299  if (!op(sequence, offset, expand_buf))
300  return false;
301 
302  ptr += input_buf.fill();
303  sequence++;
304  }
305 
306  return true;
307  }
308 
309  void check_bloom_filter(State &state, const char *row) {
310  if (state.bloom_filter && !state.bloom_filter_is_bad) {
311  if (!state.bloom_filter->may_contain(row, strlen(row)))
312  state.bloom_filter_is_bad = true;
313  }
314  }
315 
316  struct reconstruct_block_info {
317  reconstruct_block_info(State &s) : state(s) { }
318  bool operator()( size_t sequence, int64_t offset, DynamicBuffer &buf) {
319  Key key;
320  ByteString value;
321  BlockEntry be;
322  const uint8_t *ptr;
323  const uint8_t *end = buf.base + buf.fill();
324 
325  be.sequence = sequence;
326  be.offset = offset;
327 
328  state.key_decompressor->reset();
329  value.ptr = state.key_decompressor->add(buf.base);
330  ptr = value.ptr + value.length();
331  state.key_decompressor->load(key);
332  check_bloom_filter(state, key.row);
333 
334  while (ptr < end) {
335  value.ptr = state.key_decompressor->add(ptr);
336  state.key_decompressor->load(key);
337  check_bloom_filter(state, key.row);
338  ptr = value.ptr + value.length();
339  }
340 
341  if (ptr > end)
342  return false;
343 
344  state.key_decompressor->load(key);
345  be.rowkey = new char [ strlen(key.row)+1 ];
346  strcpy(be.rowkey, key.row);
347  state.reconstructed_block_info.push_back(be);
348  return true;
349  }
350  private:
351  State &state;
352  };
353 
354  struct ltbe {
355  bool operator()(const BlockEntry* b1, const BlockEntry* b2) const {
356  return b1->offset < b2->offset;
357  }
358  };
359 
360  void reconcile_block_index(State &state) {
361  std::set<BlockEntry *, ltbe> bset;
362  std::set<BlockEntry *, ltbe>::iterator iter;
363  int64_t last_offset = 0;
364 
365  for (size_t i=0; i<state.reconstructed_block_info.size(); i++)
366  bset.insert(&state.reconstructed_block_info[i]);
367 
368  for (size_t i=0; i<state.index_block_info.size(); i++) {
369  if (i > 0 && state.index_block_info[i].offset <= last_offset)
370  state.block_index_is_bad = true;
371  iter = bset.find(&state.index_block_info[i]);
372  if (iter != bset.end()) {
373  (*iter)->matched = true;
374  state.index_block_info[i].matched = true;
375  if (strcmp((*iter)->rowkey, state.reconstructed_block_info[i].rowkey)) {
376  (*iter)->key_mismatch = state.index_block_info[i].key_mismatch = true;
377  state.block_index_is_bad = true;
378  }
379  }
380  last_offset = state.index_block_info[i].offset;
381  }
382 
383  for (size_t i=0; i<state.reconstructed_block_info.size(); i++) {
384  if (!state.reconstructed_block_info[i].matched)
385  state.block_index_is_bad = true;
386  else if (state.reconstructed_block_info[i].key_mismatch)
387  state.block_index_is_bad = true;
388  }
389 
390  for (size_t i=0; i<state.index_block_info.size(); i++) {
391  if (!state.index_block_info[i].matched)
392  state.block_index_is_bad = true;
393  }
394  }
395 
396  void describe_block_index_corruption(State &state) {
397  size_t key_mismatches = 0;
398  int64_t last_offset = 0;
399 
400  for (size_t i=0; i<state.reconstructed_block_info.size(); i++) {
401  if (!state.reconstructed_block_info[i].matched) {
402  cout << "Missing block index entry (offset=" << state.reconstructed_block_info[i].offset;
403  cout << ", row=" << state.reconstructed_block_info[i].rowkey << ")" << endl;
404  }
405  else if (state.reconstructed_block_info[i].key_mismatch) {
406  key_mismatches++;
407  }
408  }
409 
410  for (size_t i=0; i<state.index_block_info.size(); i++) {
411  if (i > 0 && state.index_block_info[i].offset <= last_offset) {
412  cout << "Out-of-order block index entry (offset=" << state.index_block_info[i].offset;
413  cout << ", row=" << state.index_block_info[i].rowkey << ")" << endl;
414  }
415  if (!state.index_block_info[i].matched) {
416  cout << "Bogus block index entry (offset=" << state.index_block_info[i].offset;
417  cout << ", row=" << state.index_block_info[i].rowkey << ")" << endl;
418  }
419  last_offset = state.index_block_info[i].offset;
420  }
421  }
422 
423 
424 } // local namespace
425 
426 
427 int main(int argc, char **argv) {
428  State state;
429 
430  try {
431  init_with_policies<Policies>(argc, argv);
432 
433  int timeout = get_i32("timeout");
434  state.fname = get_str("filename");
435 
436  cout << "Checking " << state.fname << " ... " << flush;
437 
438  ConnectionManagerPtr conn_mgr = make_shared<ConnectionManager>();
439 
440  FsBroker::Lib::ClientPtr dfs = std::make_shared<FsBroker::Lib::Client>(conn_mgr, properties);
441 
442  if (!dfs->wait_for_connection(timeout)) {
443  cout << "timed out waiting for FS broker" << endl;
444  quick_exit(EXIT_FAILURE);
445  }
446 
447  Global::dfs = dfs;
449 
450  load_file(state.fname, state);
451  read_block_index(state);
452  read_bloom_filter(state);
453  reconstruct_block_info rbi(state);
454  process_blocks(state, rbi);
455  reconcile_block_index(state);
456 
457  if (state.block_index_is_bad || state.bloom_filter_is_bad) {
458  if (state.block_index_is_bad) {
459  cout << "block index is bad" << endl;
460  describe_block_index_corruption(state);
461  }
462  if (state.bloom_filter_is_bad)
463  cout << "bloom filter is bad" << endl;
464  }
465  else
466  cout << "valid" << endl;
467  /*
468  for (size_t i=0; i<state.index_block_info.size(); i++) {
469  cout << state.index_block_info[i].sequence << ": " << " (" << state.index_block_info[i].offset << ") ";
470  cout << state.index_block_info[i].rowkey << endl;
471  }
472  */
473 
474  }
475  catch (Exception &e) {
476  HT_ERROR_OUT << e << HT_END;
477  return 1;
478  }
479 
480  return 0;
481 }
Retrieves system information (hardware, installation directory, etc)
Abstract base class for cell store trailer.
Interface and base of config policy.
Definition: Config.h:149
const char * row
Definition: Key.h:129
Declarations for CellStoreFactory.
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
const char * row() const
Definition: SerializedKey.h:53
Helper class for printing usage banners on the command line.
A space-efficent probabilistic set for membership test, false postives are possible, but false negatives are not.
void init(int argc, char *argv[], const Desc *desc=NULL)
Initialize with default policy.
Definition: Init.h:95
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
STL namespace.
Type
Enumeration for compression type.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Tracks range server memory used.
Definition: MemoryTracker.h:42
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
Desc & cmdline_desc(const char *usage)
A macro which definds global functions like get_bool(), get_str(), get_i16() etc. ...
Definition: Config.cc:72
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Declarations for CellStore.
uint64_t decode_i64(const uint8_t **bufp, size_t *remainp)
Decode a 64-bit integer in little-endian order.
static Hypertable::MemoryTracker * memory_tracker
Definition: Global.h:94
uint16_t decode_i16(const uint8_t **bufp, size_t *remainp)
Decode a 16-bit integer in little-endian order.
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:233
Logging routines and macros.
BloomFilterMode
Enumeration for bloom filter modes.
Compatibility Macros for C/C++.
Initialization helper for applications.
#define HT_END
Definition: Logger.h:220
Functions to serialize/deserialize primitives to/from a memory buffer.
static Hypertable::FilesystemPtr dfs
Definition: Global.h:64
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
#define HT_ERROR_OUT
Definition: Logger.h:301
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
Implementation of checksum routines.
Declarations for BlockHeaderCellStore.
Declarations for ConnectionManager.
Declarations for Comm.
Represents the trailer for CellStore version 7.
int main(int argc, char **argv)
Definition: csvalidate.cc:427
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
Internet address wrapper classes and utility functions.
Meta::list< MyPolicy, DefaultPolicy > Policies
Declarations for ReactorFactory.
This is a generic exception class for Hypertable.
Definition: Error.h:314
A serializable ByteString.
Declarations for CellStoreTrailerV7.
A Bloom Filter with Checksums.
BasicBloomFilterWithChecksum BloomFilterWithChecksum
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
Desc & cmdline_hidden_desc()
Get the command line hidden options description (for positional options)
Definition: Config.cc:81
PositionalDesc & cmdline_positional_desc()
Get the command line positional options description.
Definition: Config.cc:90
Declarations for Client.
Abstract base class for block compression codecs.