0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableMutatorAsync.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hypertable_Lib_TableMutatorAsync_h
23 #define Hypertable_Lib_TableMutatorAsync_h
24 
25 #include "Cells.h"
26 #include "ClientObject.h"
27 #include "KeySpec.h"
28 #include "Table.h"
30 #include "RangeLocator.h"
31 #include "Schema.h"
32 #include "TableIdentifier.h"
33 
35 
36 #include <Common/Properties.h>
37 #include <Common/StringExt.h>
38 #include <Common/Timer.h>
39 
40 #include <condition_variable>
41 #include <iostream>
42 #include <mutex>
43 
44 namespace Hypertable {
45 
47  typedef std::shared_ptr<TableMutatorAsync> TableMutatorAsyncPtr;
48 
50  typedef std::shared_ptr<IndexMutatorCallback> IndexMutatorCallbackPtr;
51 
52  class TableMutator;
53 
62 
63  public:
64 
80  TableMutatorAsync(PropertiesPtr &props, Comm *comm,
81  ApplicationQueueInterfacePtr &app_queue, Table *table,
82  RangeLocatorPtr &range_locator, uint32_t timeout_ms, ResultCallback *cb,
83  uint32_t flags = 0, bool explicit_block_only = false);
84 
85  TableMutatorAsync(std::mutex &mutex, std::condition_variable &cond, PropertiesPtr &props, Comm *comm,
86  ApplicationQueueInterfacePtr &app_queue, Table *table,
87  RangeLocatorPtr &range_locator, uint32_t timeout_ms, ResultCallback *cb,
88  uint32_t flags = 0, bool explicit_block_only = false,
89  TableMutator *mutator = 0);
90 
95  virtual ~TableMutatorAsync();
96 
102  virtual uint64_t memory_used() { return m_memory_used; }
103 
113  uint64_t get_resend_count() { return m_resends; }
114 
122  void set(const KeySpec &key, const void *value, uint32_t value_len);
123 
127  void set(const KeySpec &key, const char *value) {
128  if (value)
129  set(key, value, strlen(value));
130  else
131  set(key, 0, 0);
132  }
133 
137  void set(const KeySpec &key, const std::string &value) {
138  set(key, value.data(), value.length());
139  }
140 
147  void set_delete(const KeySpec &key);
148 
155  void set_cells(const Cells &cells) {
156  set_cells(cells.begin(), cells.end());
157  }
158 
166  void set_cells(Cells::const_iterator start, Cells::const_iterator end);
167 
172  void flush(bool sync=true);
173 
180  void buffer_finish(uint32_t id, int error, bool retry);
181  void cancel();
182  bool is_cancelled();
183  void get_unsynced_rangeservers(std::vector<CommAddress> &unsynced);
185  bool retry(uint32_t timeout_ms);
187  void get_failed_mutations(FailedMutations &failed_mutations) {
188  std::lock_guard<std::mutex> lock(m_member_mutex);
189  failed_mutations = m_failed_mutations;
190  }
192  std::lock_guard<std::mutex> lock(m_mutex);
193  return !m_outstanding_buffers.empty();
194  }
196  return !m_outstanding_buffers.empty();
197  }
198  bool needs_flush();
199 
200  SchemaPtr schema() { std::lock_guard<std::mutex> lock(m_mutex); return m_schema; }
201 
202  protected:
203  void wait_for_completion();
204 
205  private:
207  friend class TableMutator;
208  void flush_with_tablequeue(TableMutator *mutator, bool sync=true);
209 
210  void initialize(PropertiesPtr &props);
211 
212  void initialize_indices(PropertiesPtr &props);
213 
214  friend class IndexMutatorCallback;
215  void update_without_index(const Cell &cell);
216 
217  void update_without_index(const Key &full_key, const ColumnFamilySpec *cf, const Cell &cell);
218 
219  void update_without_index(const Key &full_key, const ColumnFamilySpec *cf, const void *value,
220  size_t value_len);
221 
222  enum Operation {
223  SET = 1,
227  };
228 
232  void do_sync();
233 
234  void to_full_key(const void *row, const char *cf, const void *cq,
235  int64_t ts, int64_t rev, uint8_t flag, Key &full_key,
236  ColumnFamilySpec **pcf = 0);
237 
238  void to_full_key(const KeySpec &key, Key &full_key,
239  ColumnFamilySpec **cf = 0) {
241  key.timestamp, key.revision, key.flag, full_key, cf);
242  }
243 
244  void to_full_key(const Cell &cell, Key &full_key,
245  ColumnFamilySpec **cf = 0) {
247  cell.timestamp, cell.revision, cell.flag, full_key, cf);
248  }
249 
250  void update_unsynced_rangeservers(const CommAddressSet &unsynced);
251 
252  void handle_send_exceptions(const String& info);
253 
254  bool mutated() {
255  std::lock_guard<std::mutex> lock(m_member_mutex);
256  return m_mutated;
257  }
258 
259  bool key_uses_index(Key &key);
260 
261  void update_with_index(Key &key, const ColumnFamilySpec *cf, const void *value,
262  uint32_t value_len);
263 
264  typedef std::map<uint32_t, TableMutatorAsyncScatterBufferPtr> ScatterBufferAsyncMap;
265 
266  const static uint32_t ms_max_sync_retries = 5;
267 
272  SchemaPtr m_schema; // needs mutex
275  uint64_t m_memory_used {}; // protected by buffer_mutex
276  uint64_t m_max_memory {};
277  ScatterBufferAsyncMap m_outstanding_buffers; // protected by buffer mutex
279  uint64_t m_resends {}; // needs mutex
280  uint32_t m_timeout_ms {};
282  uint32_t m_flags {};
287  std::condition_variable m_buffer_cond;
288  std::condition_variable &m_cond;
289  uint32_t m_next_buffer_id {}; // needs mutex
291  TableMutatorAsyncPtr m_index_mutator;
292  TableMutatorAsyncPtr m_qualifier_index_mutator;
293  IndexMutatorCallbackPtr m_imc;
296  bool m_cancelled {};
297  bool m_mutated {}; // needs mutex
298  bool m_use_index {};
299 
300  };
301 
302 }
303 
304 #endif // Hypertable_Lib_TableMutatorAsync_h
int64_t timestamp
Definition: KeySpec.h:130
static std::mutex mutex
Definition: Logger.cc:43
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
void initialize(PropertiesPtr &props)
void initialize_indices(PropertiesPtr &props)
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< RangeLocator > RangeLocatorPtr
Smart pointer to RangeLocator.
Definition: RangeLocator.h:198
ScatterBufferAsyncMap m_outstanding_buffers
Declarations for TableIdentifier and TableIdentifierManaged.
TableIdentifierManaged m_table_identifier
const char * column_qualifier
Definition: KeySpec.h:128
void set(const KeySpec &key, const void *value, uint32_t value_len)
Inserts a cell into the table.
Program options handling.
std::shared_ptr< TableMutatorAsyncScatterBuffer > TableMutatorAsyncScatterBufferPtr
Smart pointer to TableMutatorAsyncScatterBuffer.
Column family specification.
TableMutatorAsyncPtr m_qualifier_index_mutator
const char * column_qualifier
Definition: Cell.h:68
void set(const KeySpec &key, const std::string &value)
Convenient helper for std::string values.
void to_full_key(const void *row, const char *cf, const void *cq, int64_t ts, int64_t rev, uint8_t flag, Key &full_key, ColumnFamilySpec **pcf=0)
void set_delete(const KeySpec &key)
Deletes an entire row, a column family in a particular row, or a specific cell within a row...
void flush_with_tablequeue(TableMutator *mutator, bool sync=true)
int64_t revision
Definition: KeySpec.h:131
void to_full_key(const KeySpec &key, Key &full_key, ColumnFamilySpec **cf=0)
void flush(bool sync=true)
Flushes the current buffer accumulated mutations to their respective range servers.
const void * row
Definition: KeySpec.h:125
std::shared_ptr< IndexMutatorCallback > IndexMutatorCallbackPtr
void handle_send_exceptions(const String &info)
Declarations for Schema.
void update_with_index(Key &key, const ColumnFamilySpec *cf, const void *value, uint32_t value_len)
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Wrapper for TableIdentifier providing member storage.
std::shared_ptr< TableMutatorAsync > TableMutatorAsyncPtr
Represents an open table.
Definition: Table.h:58
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
Definition: TableMutator.h:55
IndexMutatorCallbackPtr m_imc
void set(const KeySpec &key, const char *value)
Convenient helper for null-terminated values.
void set_cells(const Cells &cells)
Insert a bunch of cells into the table (atomically if cells are in the same range/row) ...
bool retry(uint32_t timeout_ms)
virtual ~TableMutatorAsync()
Destructor for TableMutatorAsync object Make sure buffers are flushed and unsynced rangeservers get s...
uint64_t revision
Definition: Cell.h:70
void buffer_finish(uint32_t id, int error, bool retry)
This is where buffers call back into when their outstanding operations are complete.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
void update_unsynced_rangeservers(const CommAddressSet &unsynced)
A timer class to keep timeout states across AsyncComm related calls.
TableMutatorAsyncPtr m_index_mutator
void update_without_index(const Cell &cell)
Declarations for ClientObject.
static const uint32_t ms_max_sync_retries
const char * row_key
Definition: Cell.h:66
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
TableMutatorAsyncScatterBufferPtr m_current_buffer
TableMutatorAsync(PropertiesPtr &props, Comm *comm, ApplicationQueueInterfacePtr &app_queue, Table *table, RangeLocatorPtr &range_locator, uint32_t timeout_ms, ResultCallback *cb, uint32_t flags=0, bool explicit_block_only=false)
Constructs the TableMutatorAsync object.
virtual uint64_t memory_used()
Returns the amount of memory used by the collected mutations in the current buffer.
void do_sync()
Calls sync on any unsynced rangeservers and waits for completion.
Base class for Hypertable client objects.
Definition: ClientObject.h:44
Hypertable definitions
std::set< CommAddress > CommAddressSet
Set of CommAddress objects.
Definition: CommAddress.h:212
void get_failed_mutations(FailedMutations &failed_mutations)
Entry point to AsyncComm service.
Definition: Comm.h:61
TableMutatorAsyncScatterBufferPtr get_outstanding_buffer(size_t id)
Declarations for ConnectionManager.
void get_unsynced_rangeservers(std::vector< CommAddress > &unsynced)
void update_outstanding(TableMutatorAsyncScatterBufferPtr &buffer)
const char * column_family
Definition: Cell.h:67
std::condition_variable & m_cond
ResultCallback for secondary indices; used by TableMutatorAsync.
Provides access to internal components of opaque key.
Definition: Key.h:40
Declarations for TableMutatorAsyncScatterBuffer.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
void to_full_key(const Cell &cell, Key &full_key, ColumnFamilySpec **cf=0)
uint8_t flag
Definition: Cell.h:73
ApplicationQueueInterfacePtr m_app_queue
std::map< uint32_t, TableMutatorAsyncScatterBufferPtr > ScatterBufferAsyncMap
Encapsulates decomposed key and value.
Definition: Cell.h:32
Represents an open table.
String extensions and helpers: sets, maps, append operators etc.
const char * column_family
Definition: KeySpec.h:127
std::condition_variable m_buffer_cond
uint64_t get_resend_count()
There are certain circumstances when mutations get flushed to the wrong range server due to stale ran...
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39
int64_t timestamp
Definition: Cell.h:69