0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableMutator.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "Key.h"
25 #include "TableMutator.h"
26 
27 #include <Common/Config.h>
28 #include <Common/StringExt.h>
29 
30 #include <boost/algorithm/string.hpp>
31 
32 #include <chrono>
33 #include <cstring>
34 #include <thread>
35 
36 using namespace Hypertable;
37 using namespace Hypertable::Config;
38 using namespace std;
39 
41  try {
42  throw;
43  }
44  catch (std::bad_alloc &e) {
45  set_last_error(Error::BAD_MEMORY_ALLOCATION);
46  HT_ERROR("caught bad_alloc here");
47  }
48  catch (std::exception &e) {
49  set_last_error(Error::EXTERNAL);
50  HT_ERRORF("caught std::exception: %s", e.what());
51  }
52  catch (...) {
53  set_last_error(Error::EXTERNAL);
54  HT_ERROR("caught unknown exception here");
55  }
56 }
57 
58 
59 TableMutator::TableMutator(PropertiesPtr &props, Comm *comm, Table *table, RangeLocatorPtr &range_locator,
60  uint32_t timeout_ms, uint32_t flags)
61  : m_callback(this), m_timeout_ms(timeout_ms), m_flags(flags), m_flush_delay(0),
62  m_last_error(Error::OK), m_last_op(0), m_unflushed_updates(false) {
63  HT_ASSERT(timeout_ms);
64  m_queue = make_shared<TableMutatorQueue>(m_queue_mutex, m_cond);
66 
67  m_flush_delay = props->get_i32("Hypertable.Mutator.FlushDelay");
68  m_mutator =
69  make_shared<TableMutatorAsync>(m_queue_mutex, m_cond, props, comm, app_queue,
70  table, range_locator, timeout_ms, &m_callback,
71  flags, false, this);
72 }
73 
75  try {
77  flush();
78  m_mutator->cancel();
79  }
80  catch (Exception &e) {
81  HT_ERROR_OUT << e << HT_END;
82  }
83 }
84 
85 void
86 TableMutator::set(const KeySpec &key, const void *value, uint32_t value_len) {
87 
88  try {
89  m_last_op = SET;
90  auto_flush();
91  m_mutator->set(key, value, value_len);
92  m_unflushed_updates = true;
93  }
94  catch (Exception &e) {
95  HT_ERROR_OUT << e << HT_END;
96  save_last(key, value, value_len);
97  throw;
98  }
99  catch (...) {
101  save_last(key, value, value_len);
102  throw;
103  }
104 }
105 
106 
107 void
108 TableMutator::set_cells(Cells::const_iterator it, Cells::const_iterator end) {
109 
110  try {
112  auto_flush();
113  m_mutator->set_cells(it, end);
114  m_unflushed_updates = true;
115  }
116  catch (Exception &e) {
117  HT_ERROR_OUT << e << HT_END;
118  save_last(it, end);
119  throw;
120  }
121  catch (...) {
123  save_last(it, end);
124  throw;
125  }
126 }
127 
128 
130 
131  try {
133  auto_flush();
134  m_mutator->set_delete(key);
135  m_unflushed_updates = true;
136  }
137  catch (Exception &e) {
138  HT_ERROR_OUT << e << HT_END;
139  m_last_key = key;
140  throw;
141  }
142  catch (...) {
144  m_last_key = key;
145  throw;
146  }
147 }
148 
150  try {
151 
152  if (!m_mutator->needs_flush())
153  return;
154 
156 
157  if (m_flush_delay)
158  this_thread::sleep_for(chrono::milliseconds(m_flush_delay));
159 
160  m_mutator->flush_with_tablequeue(this,
162  }
163  catch (...) {
164  m_last_op = FLUSH;
166  throw;
167  }
168 }
169 
170 
172  try {
174 
176 
177  m_mutator->flush_with_tablequeue(this);
178 
180 
181  m_unflushed_updates = false;
182  }
183  catch (Exception &e) {
184  HT_ERROR_OUT << e << HT_END;
185  m_last_op = FLUSH;
186  throw;
187  }
188  catch (...) {
189  m_last_op = FLUSH;
191  throw;
192  }
193 }
194 
196  int last_error = 0;
197  ApplicationHandler *app_handler = 0;
198  while (true) {
199  {
200  unique_lock<mutex> lock(m_queue_mutex);
201  if (mutator->has_outstanding_unlocked()) {
202  m_queue->wait_for_buffer(lock, &app_handler);
203  {
204  lock_guard<mutex> lock(m_mutex);
205  last_error = m_last_error;
206  }
207  }
208  else {
209  lock_guard<mutex> lock(m_mutex);
210  if (m_last_error != Error::OK)
211  HT_THROW(m_last_error, "");
212  return;
213  }
214  }
215  app_handler->run();
216  delete app_handler;
217 
218  if (last_error != Error::OK)
219  HT_THROW(m_last_error, "");
220  }
221 }
222 
223 bool TableMutator::retry(uint32_t timeout_ms) {
224  uint32_t save_timeout = m_timeout_ms;
225 
226  /* issue 938: don't continue if there's a row overflow */
228  return true;
229 
230  {
231  lock_guard<mutex> lock(m_mutex);
232  if (m_last_error == Error::OK)
233  return true;
235  }
236 
237  try {
238  if (timeout_ms != 0)
239  m_timeout_ms = timeout_ms;
240 
241  switch (m_last_op) {
243  case SET_DELETE: set_delete(m_last_key); break;
245  case FLUSH: retry_flush(); break;
246  }
247  }
248  catch(...) {
249  m_timeout_ms = save_timeout;
250  return false;
251  }
252  m_timeout_ms = save_timeout;
253  return true;
254 }
255 
257  FailedMutations failed_mutations;
258  CellsBuilderPtr failed_cells;
259 
260  {
261  lock_guard<mutex> lock(m_mutex);
262  if (m_failed_cells && m_failed_cells->size() > 0) {
263  failed_mutations.swap(m_failed_mutations);
264  failed_cells = m_failed_cells;
265  m_failed_cells.reset();
266  }
267  }
268 
269  if (failed_cells && failed_cells->size() > 0)
270  set_cells(failed_cells->get());
271 
272  this_thread::sleep_for(chrono::milliseconds(2000));
273 
274  flush();
275 }
276 
277 std::ostream &
278 TableMutator::show_failed(const Exception &e, std::ostream &out) {
279  lock_guard<mutex> lock(m_mutex);
280 
281  if (!m_failed_mutations.empty()) {
282  for (const auto &v : m_failed_mutations) {
283  out << "Failed: (" << v.first.row_key << "," << v.first.column_family;
284 
285  if (v.first.column_qualifier && *(v.first.column_qualifier))
286  out << ":" << v.first.column_qualifier;
287 
288  out << "," << v.first.timestamp << ") - "
289  << Error::get_text(v.second) << '\n';
290  }
291  out.flush();
292  }
293  else throw e;
294 
295  return out;
296 }
297 
299  lock_guard<mutex> lock(m_mutex);
300  if (m_failed_cells && m_failed_cells->size() > 0) {
301  m_failed_mutations.clear();
302  m_failed_cells.reset();
303  }
304 }
305 
306 void TableMutator::update_error(int error, FailedMutations &failures) {
307  lock_guard<mutex> lock(m_mutex);
308  // copy all failed updates
309  m_last_error = error;
310  if (!m_failed_cells)
311  m_failed_cells = make_shared<CellsBuilder>(failures.size());
312  m_failed_cells->copy_failed_mutations(failures, m_failed_mutations);
313 }
void wait_for_flush_completion(TableMutatorAsync *mutator)
FailedMutations m_failed_mutations
Definition: TableMutator.h:252
void set_last_error(int32_t error)
Definition: TableMutator.h:196
CellsBuilderPtr m_failed_cells
Definition: TableMutator.h:253
std::ostream & show_failed(const Exception &, std::ostream &=std::cout)
Show failed mutations.
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
void save_last(const KeySpec &key, const void *value, size_t value_len)
Definition: TableMutator.h:208
virtual void set_delete(const KeySpec &key)
Deletes an entire row, a column family in a particular row, or a specific cell within a row...
STL namespace.
virtual void set(const KeySpec &key, const void *value, uint32_t value_len)
Inserts a cell into the table.
Definition: TableMutator.cc:86
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Cells::const_iterator m_last_cells_it
Definition: TableMutator.h:247
Represents an open table.
Definition: Table.h:58
virtual void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
Definition: TableMutator.h:124
virtual ~TableMutator()
Destructor for TableMutator object Make sure buffers are flushed and unsynced rangeservers get synced...
Definition: TableMutator.cc:74
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
#define HT_ERROR_OUT
Definition: Logger.h:301
virtual bool retry(uint32_t timeout_ms=0)
Retries the last operation.
std::shared_ptr< CellsBuilder > CellsBuilderPtr
Smart pointer to CellsBuilder.
Definition: Cells.h:137
Hypertable definitions
Entry point to AsyncComm service.
Definition: Comm.h:61
#define HT_ERROR(msg)
Definition: Logger.h:299
virtual void run()=0
Carries out the request.
TableMutatorAsyncPtr m_mutator
Definition: TableMutator.h:238
This is a generic exception class for Hypertable.
Definition: Error.h:314
Base clase for application handlers.
TableCallback m_callback
Definition: TableMutator.h:236
std::condition_variable m_cond
Definition: TableMutator.h:227
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
TableMutator(PropertiesPtr &props, Comm *comm, Table *table, RangeLocatorPtr &range_locator, uint32_t timeout_ms, uint32_t flags=0)
Constructs the TableMutator object.
Definition: TableMutator.cc:59
virtual void flush()
Flushes the accumulated mutations to their respective range servers.
Configuration settings.
void update_error(int error, FailedMutations &failures)
TableMutatorQueuePtr m_queue
Definition: TableMutator.h:237
String extensions and helpers: sets, maps, append operators etc.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39
Cells::const_iterator m_last_cells_end
Definition: TableMutator.h:248