0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CommitLogBlockStream.cc
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
29 #include "CommitLogBlockStream.h"
30 
31 #include <FsBroker/Lib/Utility.h>
32 
33 #include <Common/Checksum.h>
34 #include <Common/Config.h>
35 #include <Common/Error.h>
36 #include <Common/Logger.h>
37 #include <Common/Path.h>
38 #include <Common/StatusPersister.h>
39 
40 #include <cctype>
41 #include <cstdlib>
42 #include <cstring>
43 
44 using namespace Hypertable;
45 using namespace std;
46 
47 namespace {
48  const uint32_t READAHEAD_BUFFER_SIZE = 131072;
49  const uint32_t LatestVersion = 1;
50  const uint32_t BlockHeaderVersions[LatestVersion+1] = { 0, 1 };
51 }
52 
54 
55 
57  : m_fs(fs), m_fd(-1), m_cur_offset(0), m_file_length(0) {
58 }
59 
60 
62  const string &log_dir, const string &fragment)
63  : m_fs(fs), m_fd(-1), m_cur_offset(0), m_file_length(0) {
64  load(log_dir, fragment);
65 }
66 
67 
69  close();
70 }
71 
72 
73 void CommitLogBlockStream::load(const string &log_dir, const string &fragment) {
74  if (m_fd != -1)
75  close();
76  m_fragment = fragment;
77  m_fname = log_dir + "/" + fragment;
78  m_log_dir = log_dir;
79  m_file_length = m_fs->length(m_fname);
81  READAHEAD_BUFFER_SIZE, 2);
82 
84  // If invalid header, assume file is in original format w/o header
85  m_fs->close(m_fd);
87  READAHEAD_BUFFER_SIZE, 2);
88  }
89 
90  if (m_version > LatestVersion)
92  "Unsupported commit log file version %u", (unsigned)m_version);
93 
94  BlockHeaderCommitLog temp(BlockHeaderVersions[m_version]);
96 }
97 
98 
100  if (m_fd != -1) {
101  try {
102  m_fs->close(m_fd);
103  }
104  catch (Hypertable::Exception &e) {
105  HT_ERRORF("Problem closing file %s - %s (%s)",
106  m_fname.c_str(), e.what(), Error::get_text(e.code()));
107  }
108  m_fd = -1;
109  }
110 }
111 
112 
113 bool
115  BlockHeaderCommitLog *header) {
116  uint32_t nread;
117 
118  assert(m_fd != -1);
119 
121  return false;
122 
123  memset(infop, 0, sizeof(CommitLogBlockInfo));
124  infop->log_dir = m_log_dir.c_str();
125  infop->file_fragment = m_fragment.c_str();
126  infop->start_offset = m_cur_offset;
127 
128  *header = BlockHeaderCommitLog( BlockHeaderVersions[m_version] );
129 
130  if ((infop->error = load_next_valid_header(header)) != Error::OK) {
131  infop->end_offset = m_cur_offset;
132  return false;
133  }
134 
135  // check for truncation
136  if ((m_file_length - m_cur_offset) < header->get_data_zlength()) {
137  HT_WARNF("Commit log fragment '%s' truncated (block offset %llu)",
138  m_fname.c_str(), (Llu)(m_cur_offset-header->encoded_length()));
139  infop->end_offset = m_file_length;
141  string archive_fname;
142  archive_bad_fragment(m_fname, archive_fname);
144  format("Truncated commit log file %s", m_fname.c_str()));
145  StatusPersister::set(status,
146  {format("File archived to %s",archive_fname.c_str())});
147  return false;
148  }
149 
150  m_block_buffer.ensure(header->encoded_length() + header->get_data_zlength());
151 
152  nread = m_fs->read(m_fd, m_block_buffer.ptr, header->get_data_zlength());
153 
154  if (nread != header->get_data_zlength()) {
155  HT_WARNF("Commit log fragment '%s' truncated (entry start position %llu)",
156  m_fname.c_str(), (Llu)(m_cur_offset-header->encoded_length()));
157  infop->end_offset = m_file_length;
159  string archive_fname;
160  archive_bad_fragment(m_fname, archive_fname);
162  format("Truncated commit log file %s", m_fname.c_str()));
163  StatusPersister::set(status,
164  {format("File archived to %s",archive_fname.c_str())});
165  return false;
166  }
167 
168  m_block_buffer.ptr += nread;
169  m_cur_offset += nread;
170  infop->end_offset = m_cur_offset;
171  infop->block_ptr = m_block_buffer.base;
172  infop->block_len = m_block_buffer.fill();
173 
174  return true;
175 }
176 
177 namespace {
178  const size_t HEADER_SIZE=8;
179 }
180 
181 
182 bool
184  uint32_t *versionp, uint64_t *next_offset) {
185  char buf[HEADER_SIZE];
186 
187  if (fs->read(fd, buf, HEADER_SIZE) != HEADER_SIZE) {
188  *versionp = 0;
189  return false;
190  }
191 
192  // Sanity check header
193  if (buf[0] != 'C' || buf[1] != 'L' || buf[6] != '\f' || buf[7] != '\n' ||
194  !isdigit(buf[2]) || !isdigit(buf[3]) || !isdigit(buf[4]) ||
195  !isdigit(buf[5])) {
196  *versionp = 0;
197  *next_offset = 0;
198  return false;
199  }
200 
201  *versionp = (uint32_t)atoi((const char *)&buf[2]);
202  *next_offset = HEADER_SIZE;
203  return true;
204 }
205 
206 
207 
208 void
210  string header_str = format("CL%04u\f\n", (unsigned)LatestVersion);
211  StaticBuffer buf(HEADER_SIZE);
212  HT_ASSERT(header_str.size() == HEADER_SIZE);
213  memcpy(buf.base, header_str.c_str(), HEADER_SIZE);
214  fs->append(fd, buf, Filesystem::Flags::FLUSH);
215 }
216 
218  return HEADER_SIZE;
219 }
220 
221 
222 
224  size_t remaining = header->encoded_length();
225  try {
226  size_t nread = 0;
227  size_t toread = header->encoded_length();
228 
230 
231  while ((nread = m_fs->read(m_fd, m_block_buffer.ptr, toread)) < toread) {
232  if (nread == 0)
234  toread -= nread;
235  m_block_buffer.ptr += nread;
236  }
237 
239  header->decode((const uint8_t **)&m_block_buffer.ptr, &remaining);
240  m_cur_offset += header->encoded_length();
241  }
242  catch (Exception &e) {
243  HT_WARN_OUT << e << HT_END;
244  if (e.code() == Error::FSBROKER_EOF ||
248  string archive_fname;
249  archive_bad_fragment(m_fname, archive_fname);
250  string text;
252  text = format("Corruption detected in commit log file %s at offset %lld",
253  m_fname.c_str(), (Lld)m_cur_offset);
254  else
255  text = format("Truncated block header in commit log file %s",
256  m_fname.c_str());
258  StatusPersister::set(status,
259  { Error::get_text(e.code()),
260  e.what(),
261  format("File archived to %s", archive_fname.c_str()) });
262  }
263  else if (ms_assert_on_error)
264  HT_ABORT;
265  return e.code();
266  }
267  return Error::OK;
268 }
269 
271  string &archive_fname) {
272  string archive_directory = Config::properties->get_str("Hypertable.Directory");
273  if (archive_directory.front() != '/')
274  archive_directory = string("/") + archive_directory;
275  if (archive_directory.back() != '/')
276  archive_directory.append("/");
277  archive_directory.append("backup");
278  archive_fname = archive_directory + fname;
279  archive_directory.append(Path(fname).parent_path().string());
280 
281  // Create archive directory
282  try {
283  m_fs->mkdirs(archive_directory);
284  }
285  catch (Exception &e) {
286  HT_ERRORF("Problem creating backup directory %s - %s (%s)",
287  archive_directory.c_str(), Error::get_text(e.code()), e.what());
288  return false;
289  }
290 
291  // Remove archive file if it already exists
292  try {
293  if (m_fs->exists(archive_fname))
294  m_fs->remove(archive_fname);
295  }
296  catch (Exception &e) {
297  HT_ERRORF("Problem checking existence of backup file %s - %s (%s)",
298  archive_fname.c_str(), Error::get_text(e.code()), e.what());
299  return false;
300  }
301 
302  FsBroker::Lib::ClientPtr client = dynamic_pointer_cast<FsBroker::Lib::Client>(m_fs);
303  HT_ASSERT(client);
304 
305  // Archive file
306  try {
307  FsBroker::Lib::copy(client, fname, archive_fname);
308  }
309  catch (Exception &e) {
310  HT_ERRORF("Problem copying file %s to %s - %s (%s)",
311  fname.c_str(), archive_fname.c_str(),
312  Error::get_text(e.code()), e.what());
313  return false;
314  }
315 
316  return true;
317 }
A memory buffer of static size.
Definition: StaticBuffer.h:45
#define HT_WARNF(msg,...)
Definition: Logger.h:290
virtual size_t encoded_length()
Returns length of serizlized block header.
Holds information about an individual block.
virtual void decode(const uint8_t **bufp, size_t *remainp)
Decodes commit log block header from memory location.
Holds Nagios-style program status information.
Definition: Status.h:42
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string m_fname
Full pathname of commit log fragment file.
Compatibility class for boost::filesystem::path.
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
std::string m_fragment
Fragment file name within commit log directory.
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
#define HT_ABORT
Definition: Logger.h:175
STL namespace.
static void write_header(FilesystemPtr &fs, int32_t fd)
Writes commit log file header.
void copy(ClientPtr &client, const std::string &from, const std::string &to, int64_t offset=0)
Definition: Utility.cc:45
uint8_t * ptr
Pointer to the end of the used part of the buffer.
uint64_t m_cur_offset
Current read offset within the fragment file.
Declarations for CommitLogBlockStream.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
int load_next_valid_header(BlockHeaderCommitLog *header)
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
FilesystemPtr m_fs
Pointer to filesystem.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
void grow(size_t new_size, bool nocopy=false)
Grows the buffer and copies the data unless nocopy is true.
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:233
int error
Error (if any) encountered while reading block
Logging routines and macros.
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
uint8_t * block_ptr
Pointer to beginning of compressed block.
bool archive_bad_fragment(const std::string &fname, std::string &archive_fname)
#define HT_WARN_OUT
Definition: Logger.h:291
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
static void set(const Status &status, std::vector< std::string > additional_lines)
Sets persistent status.
Implementation of checksum routines.
static bool read_header(FilesystemPtr &fs, int32_t fd, uint32_t *versionp, uint64_t *next_offset)
Reads commit log file header.
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
Definition: Filesystem.h:572
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
uint8_t * base
Pointer to the allocated memory buffer.
static uint64_t header_size()
Size of header.
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
uint64_t m_file_length
Length of commit log fragment file.
const char * log_dir
Log directory.
This is a generic exception class for Hypertable.
Definition: Error.h:314
Proxy class for FS broker.
Definition: Client.h:58
Declarations for StatusPersister.
uint64_t start_offset
Starting offset of block within fragment file.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
uint32_t get_data_zlength()
Gets the compressed data length field.
Definition: BlockHeader.h:106
uint64_t end_offset
Ending offset of block within fragment file.
size_t block_len
Length of block.
bool next(CommitLogBlockInfo *, BlockHeaderCommitLog *)
Configuration settings.
Declarations of utility functions.
void load(const std::string &log_dir, const std::string &fragment)
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
DynamicBuffer m_block_buffer
Buffer holding most recently loaded block.
int code() const
Returns the error code.
Definition: Error.h:391
uint32_t m_version
Version of commit log fragment file format.
const char * file_fragment
File name of log fragment within log_dir.
std::string m_log_dir
Directory containing commit log fragment file.