0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CommitLogReader.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
29 #include "CommitLogReader.h"
30 
31 #include <Common/Config.h>
32 #include <Common/Error.h>
33 #include <Common/FileUtils.h>
34 #include <Common/Logger.h>
35 #include <Common/StringExt.h>
36 #include <Common/md5.h>
37 
42 
43 #include <boost/algorithm/string/predicate.hpp>
44 
45 #include <cassert>
46 #include <vector>
47 
48 extern "C" {
49 #include <dirent.h>
50 #include <errno.h>
51 #include <limits.h>
52 #include <stdlib.h>
53 #include <sys/stat.h>
54 #include <sys/types.h>
55 #include <sys/uio.h>
56 #include <unistd.h>
57 }
58 
59 using namespace Hypertable;
60 using namespace Hypertable::Config;
61 using namespace std;
62 
63 namespace {
64  struct ByFragmentNumber {
65  bool operator()(const Filesystem::Dirent &x, const Filesystem::Dirent &y) const {
66  int num_x = atoi(x.name.c_str());
67  int num_y = atoi(y.name.c_str());
68  return num_x < num_y;
69  }
70  };
71 }
72 
74  : CommitLogBase(log_dir), m_fs(fs), m_block_buffer(256),
75  m_revision(TIMESTAMP_MIN), m_last_fragment_id(-1) {
76  if (get_bool("Hypertable.CommitLog.SkipErrors"))
79  reset();
80 }
81 
83  const std::vector<int32_t> &fragment_filter)
84  : CommitLogBase(log_dir), m_fs(fs), m_block_buffer(256),
85  m_revision(TIMESTAMP_MIN),
86  m_fragment_filter(fragment_filter.begin(), fragment_filter.end()),
87  m_last_fragment_id(-1) {
88  if (get_bool("Hypertable.CommitLog.SkipErrors"))
90  load_fragments(log_dir, 0);
91  reset();
92 }
93 
94 bool
96  BlockHeaderCommitLog *header) {
97  LogFragmentQueue::iterator fragment_queue_iter;
98 
99  try_again:
100  fragment_queue_iter = m_fragment_queue.begin() + m_fragment_queue_offset;
101  if (fragment_queue_iter == m_fragment_queue.end())
102  return false;
103 
104  if ((*fragment_queue_iter)->block_stream == 0) {
105  (*fragment_queue_iter)->block_stream =
106  new CommitLogBlockStream(m_fs, (*fragment_queue_iter)->log_dir,
107  format("%u", (*fragment_queue_iter)->num));
108  m_last_fragment_fname = (*fragment_queue_iter)->block_stream->get_fname();
109  m_last_fragment_id = (int32_t)toplevel_fragment_id(*fragment_queue_iter);
110  }
111 
112  if (!(*fragment_queue_iter)->block_stream->next(infop, header)) {
113  CommitLogFileInfo *info = *fragment_queue_iter;
114  delete info->block_stream;
115  info->block_stream = 0;
116 
117  if (m_revision == TIMESTAMP_MIN) {
118  if (m_verbose)
119  HT_INFOF("Skipping log fragment '%s/%u' because unable to read any "
120  " valid blocks", info->log_dir.c_str(), info->num);
121  m_fragment_queue.erase(fragment_queue_iter);
122  }
123  else {
124  info->revision = m_revision;
126  }
128  goto try_again;
129  }
130 
131  if (header->check_magic(CommitLog::MAGIC_LINK)) {
133  string log_dir = (const char *)(infop->block_ptr + header->encoded_length());
134  boost::trim_right_if(log_dir, boost::is_any_of("/"));
135  m_linked_log_hashes.insert(md5_hash(log_dir.c_str()));
136  m_linked_logs.insert(log_dir);
137  load_fragments(log_dir, *fragment_queue_iter);
138  if (header->get_revision() > m_latest_revision)
139  m_latest_revision = header->get_revision();
140  if (header->get_revision() > m_revision)
141  m_revision = header->get_revision();
142  goto try_again;
143  }
144 
145  if (m_verbose)
146  HT_INFOF("Replaying commit log fragment %s/%u", (*fragment_queue_iter)->log_dir.c_str(),
147  (*fragment_queue_iter)->num);
148 
149  return true;
150 }
151 
152 void CommitLogReader::get_init_fragment_ids(vector<uint32_t> &ids) {
153  for (auto id : m_init_fragments) {
154  ids.push_back((uint32_t)id);
155  }
156 }
157 
158 bool
159 CommitLogReader::next(const uint8_t **blockp, size_t *lenp,
160  BlockHeaderCommitLog *header) {
161  CommitLogBlockInfo binfo;
162 
163  while (next_raw_block(&binfo, header)) {
164 
165  if (binfo.error == Error::OK) {
166  DynamicBuffer zblock(0, false);
167 
169  zblock.base = binfo.block_ptr;
170  zblock.ptr = binfo.block_ptr + binfo.block_len;
171 
172  try {
174  m_compressor->inflate(zblock, m_block_buffer, *header);
175  }
176  catch (Exception &e) {
177  LogFragmentQueue::iterator iter = m_fragment_queue.begin() + m_fragment_queue_offset;
178  HT_ERRORF("Inflate error in CommitLog fragment %s starting at "
179  "postion %lld (block len = %lld) - %s",
180  (*iter)->block_stream->get_fname().c_str(),
181  (Lld)binfo.start_offset, (Lld)(binfo.end_offset
182  - binfo.start_offset), Error::get_text(e.code()));
183  continue;
184  }
185 
186  if (header->get_revision() > m_latest_revision)
187  m_latest_revision = header->get_revision();
188 
189  if (header->get_revision() > m_revision)
190  m_revision = header->get_revision();
191 
192  *blockp = m_block_buffer.base;
193  *lenp = m_block_buffer.fill();
194  return true;
195  }
196 
197  LogFragmentQueue::iterator iter = m_fragment_queue.begin() + m_fragment_queue_offset;
198  HT_WARNF("Corruption detected in CommitLog fragment %s starting at "
199  "postion %lld for %lld bytes - %s",
200  (*iter)->block_stream->get_fname().c_str(),
201  (Lld)binfo.start_offset, (Lld)(binfo.end_offset
202  - binfo.start_offset), Error::get_text(binfo.error));
203  }
204 
205  struct LtClfip swo;
206  sort(m_fragment_queue.begin(), m_fragment_queue.end(), swo);
207 
208  return false;
209 }
210 
211 
213  vector<Filesystem::Dirent> listing;
214  CommitLogFileInfo *fi;
215  int mark = -1;
216 
217 #if 0
218  HT_DEBUG_OUT << "Reading fragments in " << log_dir
219  << " which is part of CommitLog " << m_log_dir
220  << " mark_for_deletion=" << mark_for_deletion
221  << " m_fragment_filter.size()=" << m_fragment_filter.size() << HT_END;
222 #endif
223  try {
224  m_fs->readdir(log_dir, listing);
225  }
226  catch (Hypertable::Exception &e) {
227  if (e.code() == Error::FSBROKER_BAD_FILENAME) {
228  if (m_verbose)
229  HT_INFOF("Skipping directory '%s' because it does not exist",
230  log_dir.c_str());
231  return;
232  }
233  HT_THROW2(e.code(), e, e.what());
234  }
235 
236  if (listing.size() == 0)
237  return;
238 
239  sort(listing.begin(), listing.end(), ByFragmentNumber());
240 
241  for (size_t i = 0; i < listing.size(); i++) {
242  if (boost::ends_with(listing[i].name, ".tmp"))
243  continue;
244 
245  if (boost::ends_with(listing[i].name, ".mark")) {
246  mark = atoi(listing[i].name.c_str());
247  continue;
248  }
249 
250  char *endptr;
251  int32_t num = (int32_t)strtol(listing[i].name.c_str(), &endptr, 10);
252  if (m_fragment_filter.size() && log_dir == m_log_dir &&
253  m_fragment_filter.find(num) == m_fragment_filter.end()) {
254  if (m_verbose)
255  HT_INFOF("Dropping log fragment %s/%d because it is filtered",
256  log_dir.c_str(), num);
257  //HT_DEBUG_OUT << "Fragments " << num <<" in " << log_dir
258  // << " is part of CommitLog "
259  // << m_log_dir << " is not in fragment filter of size()="
260  // << m_fragment_filter.size() << " num_listings=" << listing.size()
261  // << " added_fragments=" << added_fragments << HT_END;
262  continue;
263  }
264 
265  if (*endptr != 0) {
266  HT_WARNF("Invalid file '%s' found in commit log directory '%s'",
267  listing[i].name.c_str(), log_dir.c_str());
268  }
269  else {
270  fi = new CommitLogFileInfo();
271  fi->num = (uint32_t)num;
272  fi->log_dir = log_dir;
273  fi->log_dir_hash = md5_hash(log_dir.c_str());
274  fi->size = m_fs->length(log_dir + "/" + listing[i].name);
275  fi->parent = parent;
277  if (parent)
278  parent->references++;
279  m_fragment_queue.push_back(fi);
280  }
281  else {
282  delete fi;
283  fi = 0;
284  }
285  }
286  }
287 
288  if (mark != -1) {
289  if (m_fragment_queue.empty() || mark < (int)m_fragment_queue.front()->num) {
290  string mark_filename;
291  try {
292  mark_filename = log_dir + "/" + mark + ".mark";
293  m_fs->remove(mark_filename);
294  }
295  catch (Hypertable::Exception &e) {
296  HT_FATALF("Problem removing mark file '%s' - %s",
297  mark_filename.c_str(), e.what());
298  }
299  }
300  else
302  }
303 
304  // Add this log dir to the parent's purge_dirs set or
305  // initialize m_init_fragments vector if no parent
306  if (parent)
307  parent->purge_dirs.insert(log_dir);
308  else {
309  m_init_fragments.clear();
310  for (const auto fragment : m_fragment_queue)
311  m_init_fragments.push_back(fragment->num);
312  }
313 
314 }
315 
316 void CommitLogReader::load_compressor(uint16_t ztype) {
317 
318  if (m_compressor && ztype == m_compressor_type)
319  return;
320 
323  "Invalid compression type '%d'", (int)ztype);
324 
326 
327  if (!m_compressor) {
330  }
331 
332  m_compressor_type = ztype;
333 }
334 
CommitLogReader(FilesystemPtr &fs, const std::string &log_dir)
#define HT_WARNF(msg,...)
Definition: Logger.h:290
int64_t md5_hash(const char *input)
Returns a 64-bit hash checksum of a null terminated input buffer.
Definition: md5.cc:388
virtual size_t encoded_length()
Returns length of serizlized block header.
Holds information about an individual block.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
bool check_magic(const char *magic)
Compares a given character sequence with the magic field.
Definition: BlockHeader.h:86
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
Declarations for CommitLogReader.
uint16_t get_compression_type()
Gets the compression type field.
Definition: BlockHeader.h:128
STL namespace.
bool next_raw_block(CommitLogBlockInfo *, BlockHeaderCommitLog *)
std::vector< int32_t > m_init_fragments
Abstraction for reading a stream of blocks from a commit log file.
Declarations for BlockCompressionCodec.
Type
Enumeration for compression type.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
static const int64_t TIMESTAMP_MIN
Definition: KeySpec.h:34
Declarations for CommitLogBlockStream.
File system utility functions.
void load_fragments(String log_dir, CommitLogFileInfo *parent)
static const char MAGIC_LINK[10]
Definition: CommitLog.h:200
void get_init_fragment_ids(std::vector< uint32_t > &ids)
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
int error
Error (if any) encountered while reading block
Logging routines and macros.
Compatibility Macros for C/C++.
CommitLogBlockStream * block_stream
Definition: CommitLogBase.h:65
#define HT_END
Definition: Logger.h:220
static BlockCompressionCodec * create_block_codec(BlockCompressionCodec::Type, const BlockCompressionCodec::Args &args=BlockCompressionCodec::Args())
uint8_t * block_ptr
Pointer to beginning of compressed block.
BlockCompressionCodecPtr m_compressor
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void clear()
Clears the buffer.
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
Definition: Filesystem.h:572
CommitLogFileInfo * parent
Definition: CommitLogBase.h:66
#define HT_INFOF(msg,...)
Definition: Logger.h:272
String name
File or directory name.
Definition: Filesystem.h:96
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
uint8_t * base
Pointer to the allocated memory buffer.
static uint64_t header_size()
Size of header.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
This is a generic exception class for Hypertable.
Definition: Error.h:314
std::set< uint32_t > m_fragment_filter
bool next(const uint8_t **blockp, size_t *lenp, BlockHeaderCommitLog *)
uint64_t start_offset
Starting offset of block within fragment file.
LogFragmentQueue m_fragment_queue
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
Declarations for CommitLog.
uint64_t end_offset
Ending offset of block within fragment file.
size_t block_len
Length of block.
Configuration settings.
std::set< int64_t > m_linked_log_hashes
uint32_t toplevel_fragment_id(CommitLogFileInfo *finfo)
void load_compressor(uint16_t ztype)
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
int64_t get_revision()
Gets the revision number field.
md5 digest routines.
#define HT_DEBUG_OUT
Definition: Logger.h:261
int code() const
Returns the error code.
Definition: Error.h:391
#define HT_THROW2(_code_, _ex_, _msg_)
Definition: Error.h:484