0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MetaLogWriter.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "MetaLog.h"
31 #include "MetaLogWriter.h"
32 
33 #include <Common/Config.h>
34 #include <Common/FileUtils.h>
35 #include <Common/Path.h>
36 #include <Common/StringExt.h>
37 
38 #include <boost/algorithm/string.hpp>
39 
40 #include <algorithm>
41 #include <cassert>
42 #include <chrono>
43 #include <memory>
44 #include <thread>
45 
46 extern "C" {
47 #include <sys/types.h>
48 #include <sys/stat.h>
49 #include <fcntl.h>
50 }
51 
52 using namespace Hypertable;
53 using namespace Hypertable::MetaLog;
54 using namespace std;
55 
56 namespace {
57  const int32_t FS_BUFFER_SIZE = -1;
58  const int64_t FS_BLOCK_SIZE = -1;
59 }
60 
61 bool Writer::skip_recover_entry = false;
62 
63 
64 Writer::Writer(FilesystemPtr &fs, DefinitionPtr &definition, const string &path,
65  std::vector<EntityPtr> &initial_entities)
66  : m_fs(fs), m_definition(definition) {
67 
69 
70  // Setup FS path name
71  m_path = path;
72  boost::trim_right_if(m_path, boost::is_any_of("/"));
73  if (!m_fs->exists(m_path))
74  m_fs->mkdirs(m_path);
75 
76  // Setup local backup path name
77  Path data_dir = Config::properties->get_str("Hypertable.DataDirectory");
78  m_backup_path = (data_dir /= String("/run/log_backup/") + String(m_definition->name()) + "/" +
79  String(m_definition->backup_label())).string();
82 
84 
85  m_history_size = Config::properties->get_i32("Hypertable.MetaLog.HistorySize");
86 
87  m_max_file_size = Config::properties->get_i64("Hypertable.MetaLog.MaxFileSize");
88 
89  // get replication
90  m_replication = Config::properties->get_i32("Hypertable.Metadata.Replication");
91 
92  // get flush method
93  m_flush_method = convert(Config::properties->get_str("Hypertable.LogFlushMethod.Meta"));
94 
95  int32_t next_id = m_file_ids.empty() ? 0 : m_file_ids.front()+1;
96 
97  // Open FS file
98  m_filename = m_path + "/" + next_id;
99  m_fd = m_fs->create(m_filename, 0, FS_BUFFER_SIZE, m_replication, FS_BLOCK_SIZE);
100 
101  // Open backup file
102  m_backup_filename = m_backup_path + "/" + next_id;
103  m_backup_fd = ::open(m_backup_filename.c_str(), O_CREAT|O_TRUNC|O_WRONLY, 0644);
104 
105  m_file_ids.push_front(next_id);
106 
108 
109  write_header();
110 
111  m_write_scheduler = make_shared<WriteScheduler>(this);
112 
113  // Write existing entries
114  record_state(initial_entities);
115 
116  // Write "Recover" entity
117  if (!skip_recover_entry)
118  record_state(make_shared<EntityRecover>());
119 
120 }
121 
123  m_write_scheduler.reset();
124  close();
125 }
126 
128  lock_guard<mutex> lock(m_mutex);
129  try {
130  if (m_fd != -1) {
131  m_fs->close(m_fd);
132  m_fd = -1;
134  m_backup_fd = -1;
135  }
136  }
137  catch (Exception &e) {
138  HT_THROW2F(e.code(), e, "Error closing metalog: %s (fd=%d)", m_filename.c_str(), m_fd);
139  }
140 
141 }
142 
143 
145 
146  while (m_file_ids.size() > m_history_size) {
147 
148  // remove from brokered FS
149  string tmp_name = m_path + String("/") + m_file_ids.back();
150  m_fs->remove(tmp_name);
151 
152  // remove local backup
153  tmp_name = m_backup_path + String("/") + m_file_ids.back();
154  if (FileUtils::exists(tmp_name))
155  FileUtils::unlink(tmp_name);
156 
157  m_file_ids.pop_back();
158  }
159 
160 }
161 
162 void Writer::roll() {
163 
164  // Close descriptors
165  if (m_fd != -1) {
166  m_fs->close(m_fd);
167  m_fd = -1;
169  m_backup_fd = -1;
170  }
171 
172  int32_t next_id = m_file_ids.front() + 1;
173 
174  // Open next brokered FS file
175  m_filename = m_path + "/" + next_id;
176  m_fd = m_fs->create(m_filename, 0, FS_BUFFER_SIZE, m_replication, FS_BLOCK_SIZE);
177 
178  // Open next backup file
179  m_backup_filename = m_backup_path + "/" + next_id;
180  m_backup_fd = ::open(m_backup_filename.c_str(), O_CREAT|O_TRUNC|O_WRONLY, 0644);
181 
182  m_file_ids.push_front(next_id);
183 
185 
186  m_offset = 0;
187 
188  write_header();
189 
190  // Compute total length
191  uint32_t total_length {};
192  for (auto &entry : m_entity_map)
193  total_length += entry.second.first;
194  total_length += EntityHeader::LENGTH; // For Recover entity
195 
196  // Create initial file contents
197  StaticBuffer buf (new uint8_t [total_length], total_length);
198  uint8_t *ptr = buf.base;
199  for (auto &entry : m_entity_map) {
200  memcpy(ptr, entry.second.second.get(), entry.second.first);
201  ptr += entry.second.first;
202  }
203  EntityRecover er;
204  er.encode_entry(&ptr);
205  HT_ASSERT((ptr-buf.base) == (ptrdiff_t)buf.size);
206 
207  m_offset += buf.size;
208 
209  // Write contents to file(s)
211  m_fs->append(m_fd, buf, m_flush_method);
212 
213 }
214 
216 
217  if (!m_write_ready)
218  return;
219 
220  m_write_ready = false;
221 
222  if (!m_write_queue.empty()) {
223  size_t total {};
224  for (auto & sb : m_write_queue)
225  total += sb->size;
226 
227  StaticBuffer buf(total);
228  uint8_t *ptr = buf.base;
229 
230  for (auto & sb : m_write_queue) {
231  memcpy(ptr, sb->base, sb->size);
232  ptr += sb->size;
233  }
234  HT_ASSERT((ptr-buf.base) == (ptrdiff_t)buf.size);
235 
236  m_offset += buf.size;
237 
239  m_fs->append(m_fd, buf, m_flush_method);
240 
241  m_write_queue.clear();
242 
244  roll();
245  }
246 
247 }
248 
249 
251  : m_writer(writer) {
252  m_interval = Config::properties->get_i32("Hypertable.MetaLog.WriteInterval");
254 }
255 
257  unique_lock<mutex> lock(m_mutex);
258  if (m_scheduled)
259  m_cond.wait(lock);
260 }
261 
262 
264  lock_guard<mutex> lock(m_mutex);
265  if (m_scheduled)
266  return;
267  auto duration = chrono::system_clock::now().time_since_epoch();
268  auto millis = chrono::duration_cast<chrono::milliseconds>(duration).count();
269  uint32_t duration_millis = m_interval - (millis % m_interval);
270  int error;
271  if ((error = m_comm->set_timer(duration_millis, shared_from_this())) != Error::OK)
272  HT_FATALF("Problem setting MetaLog timer - %s", Error::get_text(error));
273  m_scheduled = true;
274 }
275 
276 
278  {
279  lock_guard<mutex> lock(m_mutex);
280  m_scheduled = false;
281  }
282  m_writer->signal_write_ready();
283  m_cond.notify_all();
284 }
285 
286 
289  uint8_t backup_buf[Header::LENGTH];
290  Header header;
291 
292  assert(strlen(m_definition->name()) < sizeof(header.name));
293 
294  header.version = m_definition->version();
295  memset(header.name, 0, sizeof(header.name));
296  strcpy(header.name, m_definition->name());
297 
298  uint8_t *ptr = buf.base;
299 
300  header.encode(&ptr);
301 
302  assert((ptr-buf.base) == Header::LENGTH);
303  memcpy(backup_buf, buf.base, Header::LENGTH);
304 
306  if (m_fs->append(m_fd, buf, m_flush_method) != Header::LENGTH)
307  HT_THROWF(Error::FSBROKER_IO_ERROR, "Error writing %s "
308  "metalog header to file: %s", m_definition->name(),
309  m_filename.c_str());
310 
312 }
313 
314 
316  unique_lock<mutex> lock(m_mutex);
317  size_t length;
318  StaticBufferPtr buf;
319 
320  if (m_fd == -1)
321  HT_THROWF(Error::CLOSED, "MetaLog '%s' has been closed", m_path.c_str());
322 
323  {
324  lock_guard<Entity> lock(*entity);
325  length = EntityHeader::LENGTH + (entity->marked_for_removal() ? 0 : entity->encoded_length());
326  buf = make_shared<StaticBuffer>(length);
327  uint8_t *ptr = buf->base;
328 
329  if (entity->marked_for_removal())
330  entity->header.encode( &ptr );
331  else
332  entity->encode_entry( &ptr );
333 
334  HT_ASSERT((ptr-buf->base) == (ptrdiff_t)buf->size);
335  }
336 
337  shared_ptr<uint8_t> backup_buf(new uint8_t [length], default_delete<uint8_t[]>());
338  memcpy(backup_buf.get(), buf->base, buf->size);
339 
340  // Add to entity map
341  if (dynamic_cast<EntityRecover *>(entity.get()) == nullptr) {
342  m_entity_map.erase(entity->header.id);
343  if (!entity->marked_for_removal())
344  m_entity_map[entity->header.id] = SerializedEntityT(length, backup_buf);
345  }
346 
347  m_write_queue.push_back(buf);
348 
349  m_write_scheduler->schedule();
350 
351  m_cond.wait(lock);
352 
354 }
355 
356 void Writer::record_state(std::vector<EntityPtr> &entities) {
357  unique_lock<mutex> lock(m_mutex);
358  shared_ptr<StaticBuffer> buffers( new StaticBuffer[entities.size()], default_delete<StaticBuffer[]>() );
359  uint8_t *ptr;
360  size_t length = 0;
361  size_t total_length = 0;
362 
363  if (entities.empty())
364  return;
365 
366  if (m_fd == -1)
367  HT_THROWF(Error::CLOSED, "MetaLog '%s' has been closed", m_path.c_str());
368 
369  size_t i=0;
370  for (auto & entity : entities) {
371  lock_guard<Entity> lock(*entity);
372  length = EntityHeader::LENGTH + (entity->marked_for_removal() ? 0 : entity->encoded_length());
373  buffers.get()[i].set(new uint8_t [length], length);
374  ptr = buffers.get()[i].base;
375  if (entity->marked_for_removal())
376  entity->header.encode( &ptr );
377  else
378  entity->encode_entry( &ptr );
379 
380  // Add to entity map
381  HT_ASSERT(dynamic_cast<EntityRecover *>(entity.get()) == nullptr);
382  m_entity_map.erase(entity->header.id);
383  if (!entity->marked_for_removal()) {
384  shared_ptr<uint8_t> backup_buf(new uint8_t [length], default_delete<uint8_t[]>());
385  memcpy(backup_buf.get(), buffers.get()[i].base, length);
386  m_entity_map[entity->header.id] = SerializedEntityT(length, backup_buf);
387  }
388 
389  HT_ASSERT((ptr-buffers.get()[i].base) == (ptrdiff_t)buffers.get()[i].size);
390  total_length += length;
391  i++;
392  }
393 
394  StaticBufferPtr buf = make_shared<StaticBuffer>(total_length);
395  ptr = buf->base;
396  for (i=0; i<entities.size(); i++) {
397  memcpy(ptr, buffers.get()[i].base, buffers.get()[i].size);
398  ptr += buffers.get()[i].size;
399  }
400  HT_ASSERT((ptr-buf->base) == (ptrdiff_t)buf->size);
401 
402  m_write_queue.push_back(buf);
403 
404  m_write_scheduler->schedule();
405 
406  m_cond.wait(lock);
407 
409 
410 }
411 
413  unique_lock<mutex> lock(m_mutex);
414  StaticBufferPtr buf = make_shared<StaticBuffer>(EntityHeader::LENGTH);
415  uint8_t *ptr = buf->base;
416 
417  if (m_fd == -1)
418  HT_THROWF(Error::CLOSED, "MetaLog '%s' has been closed", m_path.c_str());
419 
420  entity->header.flags |= EntityHeader::FLAG_REMOVE;
421  entity->header.length = 0;
422  entity->header.checksum = 0;
423 
424  entity->header.encode( &ptr );
425 
426  HT_ASSERT((ptr-buf->base) == (ptrdiff_t)buf->size);
427 
428  // Remove from entity map
429  m_entity_map.erase(entity->header.id);
430 
431  m_write_queue.push_back(buf);
432 
433  m_write_scheduler->schedule();
434 
435  m_cond.wait(lock);
436 
438 }
439 
440 
441 void Writer::record_removal(std::vector<EntityPtr> &entities) {
442  unique_lock<mutex> lock(m_mutex);
443 
444  if (entities.empty())
445  return;
446 
447  if (m_fd == -1)
448  HT_THROWF(Error::CLOSED, "MetaLog '%s' has been closed", m_path.c_str());
449 
450  size_t length = entities.size() * EntityHeader::LENGTH;
451  StaticBufferPtr buf = make_shared<StaticBuffer>(length);
452  uint8_t *ptr = buf->base;
453 
454  for (auto &entity : entities) {
455  entity->header.flags |= EntityHeader::FLAG_REMOVE;
456  entity->header.length = 0;
457  entity->header.checksum = 0;
458  entity->header.encode( &ptr );
459  // Remove from entity map
460  m_entity_map.erase(entity->header.id);
461  }
462 
463  HT_ASSERT((ptr-buf->base) == (ptrdiff_t)buf->size);
464 
465  m_write_queue.push_back(buf);
466 
467  m_write_scheduler->schedule();
468 
469  m_cond.wait(lock);
470 
472 
473 }
474 
475 
477  lock_guard<mutex> lock(m_mutex);
478  m_write_ready = true;
479  m_cond.notify_all();
480 }
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
A memory buffer of static size.
Definition: StaticBuffer.h:45
std::string m_backup_filename
Pathname of local log backup file.
int m_backup_fd
File descriptor of backup MetaLog file in local filesystem.
char name[14]
MetaLog definition name (e.g. "mml" or "rsml")
Definition: MetaLog.h:88
Filesystem::Flags m_flush_method
Log flush method (FLUSH or SYNC)
void encode(uint8_t **bufp) const
Encodes MetaLog file header.
Definition: MetaLog.cc:40
void record_removal(EntityPtr entity)
Records the removal of an entity.
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Compatibility class for boost::filesystem::path.
std::shared_ptr< Entity > EntityPtr
Smart pointer to Entity.
static bool unlink(const String &fname)
Unlinks (deletes) a file or directory.
Definition: FileUtils.cc:427
Filesystem::Flags convert(std::string str)
Converts string mnemonic to corresponding Filesystem::Flags value.
Definition: Filesystem.cc:180
int32_t m_offset
Current write offset of MetaLog file.
void purge_old_log_files()
Purges old MetaLog files.
static ssize_t write(const String &fname, const std::string &contents)
Writes a String buffer to a file; the file is overwritten if it already exists.
Definition: FileUtils.cc:124
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
Writer(FilesystemPtr &fs, DefinitionPtr &definition, const std::string &path, std::vector< EntityPtr > &initial_entities)
Constructor.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
static bool skip_recover_entry
Global flag to force writer to skip writing EntityRecover (testing)
static bool mkdirs(const String &dirname)
Creates a directory (with all parent directories, if required)
Definition: FileUtils.cc:366
WriteSchedulerPtr m_write_scheduler
Write scheduler.
DefinitionPtr m_definition
Smart pointer to MetaLog Definition.
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
void write_header()
Writes MetaLog file header.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
bool m_write_ready
Flag indicating that.
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
File system utility functions.
Static length of entity header.
Recover entity used for sanity checking.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
int64_t m_max_file_size
Maximum file size.
Writes a MetaLog.
Definition: MetaLogWriter.h:66
Compatibility Macros for C/C++.
condition_variable m_cond
Condition variable to signal completion of deferred writes.
Static header length.
Definition: MetaLog.h:81
std::string m_backup_path
Pathname of local log backup directory.
void scan_log_directory(FilesystemPtr &fs, const std::string &path, std::deque< int32_t > &file_ids)
Scans MetaLog directory for numeric file names.
Definition: MetaLog.cc:54
Comm * m_comm
Pointer to Comm layer.
std::string m_path
Path name of MetaLog directory.
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
int32_t m_replication
Replication factor.
Declarations for MetaLog::Writer.
uint16_t version
MetaLog definition version number
Definition: MetaLog.h:85
std::pair< size_t, std::shared_ptr< uint8_t > > SerializedEntityT
std::mutex m_mutex
Mutex for serializing access to members
std::shared_ptr< Filesystem > FilesystemPtr
Smart pointer to Filesystem.
Definition: Filesystem.h:572
WriteScheduler(Writer *writer)
Constructor.
void close()
Closes open file descriptors.
size_t m_history_size
Number of old MetaLog files to retain for historical purposes.
void encode_entry(uint8_t **bufp)
Encodes entity header plus serialized state.
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
int m_fd
File descriptor of MetaLog file in FS.
std::map< int64_t, SerializedEntityT > m_entity_map
Map of current serialized entity data.
This is a generic exception class for Hypertable.
Definition: Error.h:314
Declarations for MetaLog.
std::deque< int32_t > m_file_ids
Deque of existing file name IDs.
Configuration settings.
FilesystemPtr m_fs
Smart pointer to Filesystem object.
void record_state(EntityPtr entity)
Persists an Entity to the log.
std::vector< StaticBufferPtr > m_write_queue
Vector of pending writes.
String extensions and helpers: sets, maps, append operators etc.
Metalog file header.
Definition: MetaLog.h:57
void handle(EventPtr &event) override
Callback method.
std::shared_ptr< StaticBuffer > StaticBufferPtr
Smart pointer to StaticBuffer.
Definition: StaticBuffer.h:232
std::string m_filename
Full pathname of MetaLog file open for writing.
std::shared_ptr< Definition > DefinitionPtr
Smart pointer to Definition.
MetaLog framework.
Definition: MetaLog.h:44
int code() const
Returns the error code.
Definition: Error.h:391