0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IndexMutatorCallback.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hypertable_Lib_IndexMutatorCallback_h
23 #define Hypertable_Lib_IndexMutatorCallback_h
24 
25 #include "Key.h"
26 #include "KeySpec.h"
27 #include "ResultCallback.h"
28 #include "TableMutatorAsync.h"
29 
30 #include <vector>
31 #include <deque>
32 #include <map>
33 #include <memory>
34 #include <mutex>
35 
36 namespace Hypertable {
37 
38  struct ltStringPtr {
39  bool operator()(const char *lhs, const char *rhs) const {
40  return strcmp(lhs, rhs) < 0;
41  }
42  };
43 
47 
48  public:
49 
50  typedef std::multimap<const char *, Cell, ltStringPtr> KeyMap;
51  typedef std::pair<String, int> FailedRow;
52 
54  ResultCallback *original_cb, uint64_t max_memory)
55  : ResultCallback(), m_primary_mutator(primary_mutator),
56  m_original_cb(original_cb), m_max_memory(max_memory), m_used_memory(0) {
57  }
58 
60  }
61 
63  void buffer_key(Key &key, const void *value, uint32_t value_len) {
64  std::lock_guard<std::mutex>lock(m_mutex);
65 
66  // to avoid Schema lookups: use the numeric ID as the column family
68 
69  // create a cell and add it to the cell-buffer
70  Cell cell(key.row, tmp.c_str(), key.column_qualifier,
71  key.timestamp, key.revision, (uint8_t *)value,
72  value_len, key.flag);
73  m_cellbuffer.add(cell, true);
74  m_used_memory += (2*sizeof(Cell)) + sizeof(char *) + (4*sizeof(void *)) +
75  strlen(key.row) + tmp.size() + 20 + value_len;
76  if (key.column_qualifier)
77  m_used_memory += strlen(key.column_qualifier);
78 
79  // retrieve the inserted cell, then store it in the keymap as well
81  m_keymap.insert(KeyMap::value_type(cell.row_key, cell));
82  }
83 
90  virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &scancells) {
91  if (m_original_cb)
92  m_original_cb->scan_ok(scanner, scancells);
93  }
94 
103  virtual void scan_error(TableScannerAsync *scanner, int error,
104  const std::string &error_msg, bool eos) {
105  if (m_original_cb)
106  m_original_cb->scan_error(scanner, error, error_msg, eos);
107  }
108 
109  virtual void update_ok(TableMutatorAsync *mutator) {
110  // only propagate if event is from the primary table
111  if (mutator == m_primary_mutator && m_original_cb)
112  m_original_cb->update_ok(mutator);
113  }
114 
115  virtual void update_error(TableMutatorAsync *mutator, int error,
116  FailedMutations &failures) {
117  // check mutator; if the failures are from the primary table then
118  // propagate them directly to the caller; if failures come from
119  // updating the index table(s) then continue below
120  if (mutator == m_primary_mutator && m_original_cb) {
121  m_original_cb->update_error(mutator, error, failures);
122  return;
123  }
124 
125  std::lock_guard<std::mutex>lock(m_mutex);
126 
127  m_error = error;
128 
129  // remove failed updates from the keymap
130  for (const auto &fm : failures) {
131  const Cell &cell = fm.first;
132 
133  // get the original row key
134  const char *row = cell.row_key + strlen(cell.row_key);
135  while (*row != '\t' && row > cell.row_key)
136  --row;
137  if (*row != '\t') {
138  m_failed_rows.push_back(FailedRow(cell.row_key, error));
139  continue;
140  }
141 
142  // and store it for later
143  m_failed_rows.push_back(FailedRow(row + 1, error));
144  }
145  }
146 
148  FailedMutations propagate;
149 
150  std::lock_guard<std::mutex>lock(m_mutex);
151 
152  for (const auto &fail : m_failed_rows) {
153  // add all cells with this row key to the failed mutations and
154  // purge them from the keymap
155  std::pair<KeyMap::iterator, KeyMap::iterator> it =
156  m_keymap.equal_range(fail.first.c_str());
157  for (KeyMap::iterator i = it.first; i != it.second; ++i) {
158  Cell &cell = i->second;
159  propagate.push_back(std::pair<Cell, int>(cell, fail.second));
160  }
161  m_keymap.erase(it.first, it.second);
162  }
163 
164  if (m_original_cb && !propagate.empty())
166  }
167 
168  bool needs_flush() {
169  std::lock_guard<std::mutex>lock(m_mutex);
170  return m_used_memory > m_max_memory;
171  }
172 
174  std::lock_guard<std::mutex>lock(m_mutex);
175 
176  IndexMutatorCallback::KeyMap::iterator it;
177  for (it = m_keymap.begin(); it != m_keymap.end(); ++it) {
178  mutator->update_without_index(it->second);
179  }
180 
181  // then clear the internal buffers in the callback
182  clear();
183  }
184 
185  void clear() {
186  m_keymap.clear();
188  m_used_memory = 0;
189  }
190 
191  private:
192  // the original mutator for the primary table
194 
195  // the original callback object specified by the user
197 
198  // a flat memory buffer for storing the original keys
200 
201  // a map for storing the original keys
202  KeyMap m_keymap;
203 
204  // a list of failed keys and their error code
205  std::vector<FailedRow> m_failed_rows;
206 
207  // a mutex to protect the members
209 
210  // maximum size of buffered keys
211  uint64_t m_max_memory;
212 
213  // currently used memory
214  uint64_t m_used_memory;
215 
216  // last error code returned from the RangeServer
217  int m_error;
218  };
219 
220  typedef std::shared_ptr<IndexMutatorCallback> IndexMutatorCallbackPtr;
221 
222 } // namespace Hypertable
223 
224 #endif // Hypertable_Lib_IndexMutatorCallback_h
static std::mutex mutex
Definition: Logger.cc:43
int64_t timestamp
Definition: Key.h:134
IndexMutatorCallback(TableMutatorAsync *primary_mutator, ResultCallback *original_cb, uint64_t max_memory)
const char * row
Definition: Key.h:129
Asynchronous table scanner.
bool operator()(const char *lhs, const char *rhs) const
virtual void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
void get_cell(Cell &cc, size_t ii)
Definition: Cells.h:65
std::shared_ptr< IndexMutatorCallback > IndexMutatorCallbackPtr
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)=0
Callback method for scan errors.
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
size_t size() const
Definition: Cells.h:57
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &scancells)
Callback method for successful scan.
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)=0
Callback method for update errors.
virtual void update_ok(TableMutatorAsync *mutator)=0
Callback method for successful update.
void update_without_index(const Cell &cell)
const char * row_key
Definition: Cell.h:66
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)=0
Callback method for successful scan.
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)
Callback method for update errors.
std::pair< String, int > FailedRow
Hypertable definitions
void buffer_key(Key &key, const void *value, uint32_t value_len)
buffers a cell in the cellbuffer and the keymap
virtual void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)
Callback method for scan errors.
ResultCallback for secondary indices; used by TableMutatorAsync.
Provides access to internal components of opaque key.
Definition: Key.h:40
std::multimap< const char *, Cell, ltStringPtr > KeyMap
int64_t revision
Definition: Key.h:135
void consume_keybuffer(TableMutatorAsync *mutator)
uint8_t column_family_code
Definition: Key.h:127
uint8_t flag
Definition: Key.h:125
void add(const Cell &cell, bool own=true)
Definition: Cells.h:69
size_t size() const
Returns the number of characters written to the output buffer.
Definition: String.h:156
std::vector< FailedRow > m_failed_rows
Encapsulates decomposed key and value.
Definition: Cell.h:32
const char * column_qualifier
Definition: Key.h:130
Represents an open table.
const char * c_str() const
Returns a pointer to the output buffer content with terminating null character appended.
Definition: String.h:168
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39