0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationCompact.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
31 #include "OperationCompact.h"
32 #include "Utility.h"
33 
34 #include <Hypertable/Lib/Key.h>
36 
37 #include <Hyperspace/Session.h>
38 
39 #include <Common/Error.h>
40 #include <Common/FailureInducer.h>
41 #include <Common/ScopeGuard.h>
42 #include <Common/Serialization.h>
43 
44 #include <boost/algorithm/string.hpp>
45 
46 #include <chrono>
47 #include <thread>
48 
49 using namespace Hypertable;
50 using namespace Hyperspace;
51 using namespace std;
52 
54  const MetaLog::EntityHeader &header_)
55  : Operation(context, header_) {
56 }
57 
59  : Operation(context, event, MetaLog::EntityType::OPERATION_COMPACT) {
60  const uint8_t *ptr = event->payload;
61  size_t remaining = event->payload_len;
62  m_params.decode(&ptr, &remaining);
63  if (!m_params.table_name().empty())
66 }
67 
115  bool is_namespace;
116  StringSet servers;
117  DispatchHandlerOperationPtr op_handler;
118  TableIdentifier table;
119  int32_t state = get_state();
120 
121  HT_INFOF("Entering Compact-%lld (table=%s, row=%s) state=%s",
122  (Lld)header.id, m_params.table_name().c_str(), m_params.row().c_str(),
123  OperationState::get_text(state));
124 
125  switch (state) {
126 
128  if (!m_params.table_name().empty()) {
129  if (m_context->namemap->name_to_id(m_params.table_name(), m_id, &is_namespace)) {
130  if (is_namespace) {
131  complete_error(Error::TABLE_NOT_FOUND, format("%s is a namespace", m_params.table_name().c_str()));
132  return;
133  }
135  }
136  else {
138  return;
139  }
140  }
141  {
142  lock_guard<mutex> lock(m_mutex);
143  m_dependencies.clear();
145  }
146  HT_MAYBE_FAIL("compact-INITIAL");
147 
149  servers.clear();
150  if (!m_params.table_name().empty())
152  else
153  m_context->get_available_servers(servers);
154  {
155  lock_guard<mutex> lock(m_mutex);
156  m_servers.clear();
157  m_dependencies.clear();
158  for (StringSet::iterator iter=servers.begin(); iter!=servers.end(); ++iter) {
159  if (m_completed.count(*iter) == 0) {
160  m_dependencies.insert(*iter);
161  m_servers.insert(*iter);
162  }
163  }
165  }
166  m_context->mml_writer->record_state(shared_from_this());
167  return;
168 
170  table.id = m_id.c_str();
171  table.generation = 0;
172  op_handler = make_shared<DispatchHandlerOperationCompact>(m_context, table, m_params.row(), m_params.range_types());
173  op_handler->start(m_servers);
174  if (!op_handler->wait_for_completion()) {
175  std::set<DispatchHandlerOperation::Result> results;
176  op_handler->get_results(results);
177  for (const auto &result : results) {
178  if (result.error == Error::OK ||
179  result.error == Error::TABLE_NOT_FOUND) {
180  lock_guard<mutex> lock(m_mutex);
181  m_completed.insert(result.location);
182  }
183  else
184  HT_WARNF("Compact error at %s - %s (%s)", result.location.c_str(),
185  Error::get_text(result.error), result.msg.c_str());
186  }
187 
188  {
189  lock_guard<mutex> lock(m_mutex);
190  m_servers.clear();
191  m_dependencies.clear();
194  }
195  // Sleep a little bit to prevent busy wait
196  this_thread::sleep_for(chrono::milliseconds(5000));
197  m_context->mml_writer->record_state(shared_from_this());
198  return;
199  }
200  complete_ok();
201  break;
202 
203  default:
204  HT_FATALF("Unrecognized state %d", state);
205  }
206 
207  HT_INFOF("Leaving Compact-%lld (table=%s, row=%s) state=%s",
208  (Lld)header.id, m_params.table_name().c_str(), m_params.row().c_str(),
210 }
211 
212 
213 void OperationCompact::display_state(std::ostream &os) {
214  os << " name=" << m_params.table_name() << " id=" << m_id << " "
215  << " row=" << m_params.row() << " flags="
216  << RangeServer::Protocol::compact_flags_to_string(m_params.range_types());
217 }
218 
220  return 1;
221 }
222 
224  size_t length = m_params.encoded_length() +
226  length += 4;
227  for (auto &location : m_completed)
228  length += Serialization::encoded_length_vstr(location);
229  length += 4;
230  for (auto &location : m_servers)
231  length += Serialization::encoded_length_vstr(location);
232  return length;
233 }
234 
235 void OperationCompact::encode_state(uint8_t **bufp) const {
236  m_params.encode(bufp);
239  for (auto &location : m_completed)
240  Serialization::encode_vstr(bufp, location);
241  Serialization::encode_i32(bufp, m_servers.size());
242  for (auto &location : m_servers)
243  Serialization::encode_vstr(bufp, location);
244 }
245 
246 void OperationCompact::decode_state(uint8_t version, const uint8_t **bufp,
247  size_t *remainp) {
248  m_params.decode(bufp, remainp);
249  m_id = Serialization::decode_vstr(bufp, remainp);
250  size_t length = Serialization::decode_i32(bufp, remainp);
251  for (size_t i=0; i<length; i++)
252  m_completed.insert( Serialization::decode_vstr(bufp, remainp) );
253  length = Serialization::decode_i32(bufp, remainp);
254  for (size_t i=0; i<length; i++)
255  m_servers.insert( Serialization::decode_vstr(bufp, remainp) );
256 }
257 
258 void OperationCompact::decode_state_old(uint8_t version, const uint8_t **bufp,
259  size_t *remainp) {
260  {
261  string name = Serialization::decode_vstr(bufp, remainp);
262  string row = Serialization::decode_vstr(bufp, remainp);
263  int32_t range_types = Serialization::decode_i32(bufp, remainp);
264  m_params = Lib::Master::Request::Parameters::Compact(name, row, range_types);
265  }
266  m_id = Serialization::decode_vstr(bufp, remainp);
267  size_t length = Serialization::decode_i32(bufp, remainp);
268  for (size_t i=0; i<length; i++)
269  m_completed.insert( Serialization::decode_vstr(bufp, remainp) );
270  if (version >= 2) {
271  length = Serialization::decode_i32(bufp, remainp);
272  for (size_t i=0; i<length; i++)
273  m_servers.insert( Serialization::decode_vstr(bufp, remainp) );
274  }
275 }
276 
278  return "OperationCompact";
279 }
280 
282  return format("Compact table='%s' row='%s'", m_params.table_name().c_str(), m_params.row().c_str());
283 }
284 
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
char * decode_vstr(const uint8_t **bufp, size_t *remainp)
Decode a vstr (vint64, data, null).
void execute() override
Carries out the manual compaction operation.
#define HT_WARNF(msg,...)
Definition: Logger.h:290
The FailureInducer simulates errors.
void encode_state(uint8_t **bufp) const override
Writes serialized encoding of object state.
ContextPtr m_context
Pointer to Master context.
Definition: Operation.h:553
Declarations for DispatchHandlerOperationCompact.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
virtual size_t encoded_length() const
Returns serialized object length.
Definition: Serializable.cc:37
int64_t id
Unique ID of entity.
std::mutex m_mutex
Mutex for serializing access to members
OperationCompact(ContextPtr &context, const MetaLog::EntityHeader &header_)
Constructor for constructing object from MetaLog entry.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
StringSet m_completed
Set of range servers that have completed operation.
STL namespace.
Request parameters for compact operation.
Definition: Compact.h:46
EntityHeader header
Entity header
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
size_t encoded_length_state() const override
Returns serialized state length.
const std::string & row() const
Gets row identifying range to be compacted.
Definition: Compact.h:69
const char * get_text(int32_t state)
Definition: Operation.cc:609
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
Hyperspace definitions
std::string m_id
Table identifier
Lib::Master::Request::Parameters::Compact m_params
Request parmaeters.
Declarations for RangeServerProtocol.
Declarations for OperationCompact.
virtual void encode(uint8_t **bufp) const
Writes serialized representation of object to a buffer.
Definition: Serializable.cc:64
StringSet m_servers
Set of participating range servers.
void set_state(int32_t state)
Definition: Operation.h:473
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void add_dependency(const String &dependency)
Definition: Operation.h:462
const char * INIT
Definition: Operation.cc:45
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
Compatibility Macros for C/C++.
Functions to serialize/deserialize primitives to/from a memory buffer.
const char * METADATA
Definition: Operation.cc:48
virtual void decode(const uint8_t **bufp, size_t *remainp)
Reads serialized representation of object from a buffer.
Definition: Serializable.cc:70
void lock()
Locks the entity's mutex.
Definition: MetaLogEntity.h:97
void encode_vstr(uint8_t **bufp, const void *buf, size_t len)
Encode a buffer as variable length string (vint64, data, null)
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void decode_state_old(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Declarations for general-purpose utility functions.
const std::string label() override
Returns descriptive label for operation.
DependencySet m_dependencies
Set of dependencies.
Definition: Operation.h:595
#define HT_INFOF(msg,...)
Definition: Logger.h:272
const std::string & table_name() const
Gets name of table to compact.
Definition: Compact.h:65
void get_table_server_set(ContextPtr &context, const String &id, const String &row, StringSet &servers)
Gets set of servers holding ranges for a given table.
Definition: Utility.cc:57
uint8_t encoding_version_state() const override
Returns version of encoding format of state.
Abstract base class for master operations.
Definition: Operation.h:124
DependencySet m_exclusivities
Set of exclusivities.
Definition: Operation.h:592
void decode_state(uint8_t version, const uint8_t **bufp, size_t *remainp) override
Reads serialized encoding of object state.
const std::string name() override
Returns name of operation ("OperationCompact")
#define HT_MAYBE_FAIL(_label_)
void complete_ok(std::vector< MetaLog::EntityPtr > &additional)
Definition: Operation.cc:436
void display_state(std::ostream &os) override
Writes human readable representation of object to output stream.
Error codes, Exception handling, error logging.
void complete_error(int error, const String &msg, std::vector< MetaLog::EntityPtr > &additional)
Completes operation with error.
Definition: Operation.cc:400
std::shared_ptr< DispatchHandlerOperation > DispatchHandlerOperationPtr
Smart pointer to DispatchHandlerOperation.
int32_t range_types()
Gets range types to be compacted.
Definition: Compact.h:73
Executes user-defined functions when leaving the current scope.