0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
IndexUpdater.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 
28 #include <Common/Compat.h>
29 
30 #include "IndexUpdater.h"
31 
33 
37 #include <Hypertable/Lib/Schema.h>
38 
39 #include <Common/Filesystem.h>
40 #include <Common/Config.h>
41 
42 using namespace std;
43 
44 namespace Hypertable {
45 
47 public:
48  virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells) { }
49  virtual void scan_error(TableScannerAsync *scanner, int error,
50  const String &error_msg, bool eos) { }
51  virtual void update_ok(TableMutatorAsync *mutator) { }
52  virtual void update_error(TableMutatorAsync *mutator, int error,
53  FailedMutations &failedMutations) { }
54 };
55 
56 IndexUpdater::IndexUpdater(SchemaPtr &primary_schema, TablePtr index_table,
57  TablePtr qualifier_index_table)
58  : m_index_mutator(0), m_qualifier_index_mutator(0) {
59  m_cb = new IndexUpdaterCallback();
60  if (index_table)
61  m_index_mutator = index_table->create_mutator_async(m_cb);
62  if (qualifier_index_table)
63  m_qualifier_index_mutator =qualifier_index_table->create_mutator_async(m_cb);
64  memset(&m_index_map[0], 0, sizeof(m_index_map));
65  memset(&m_qualifier_index_map[0], 0, sizeof(m_qualifier_index_map));
66 
67  for (auto cf_spec : primary_schema->get_column_families()) {
68  if (!cf_spec || cf_spec->get_deleted())
69  continue;
70  if (cf_spec->get_value_index())
71  m_index_map[cf_spec->get_id()] = true;
72  if (cf_spec->get_qualifier_index())
73  m_qualifier_index_map[cf_spec->get_id()] = true;
74  }
75 }
76 
77 void IndexUpdater::purge(const Key &key, const ByteString &value)
78 {
79  const uint8_t *vptr = value.ptr;
80  size_t value_len = Serialization::decode_vi32(&vptr);
81 
82  HT_ASSERT(key.column_family_code != 0);
83 
84  TableMutatorAsync *value_index_mutator = 0;
85  TableMutatorAsync *qualifier_index_mutator = 0;
86 
87  try {
89  value_index_mutator = m_index_mutator;
91  qualifier_index_mutator = m_qualifier_index_mutator;
92 
93  IndexTables::add(key, FLAG_DELETE_CELL_VERSION, vptr, value_len,
94  value_index_mutator, qualifier_index_mutator);
95 
96  }
97  // log errors, but don't re-throw them; otherwise the whole compaction
98  // will stop
99  catch (Exception &e) {
100  HT_ERROR_OUT << e << HT_END;
101  }
102 }
103 
104 
105 void IndexUpdater::add(const Key &key, const ByteString &value) {
106  const uint8_t *vptr = value.ptr;
107  size_t value_len = Serialization::decode_vi32(&vptr);
108 
109  HT_ASSERT(key.column_family_code != 0);
110 
111  TableMutatorAsync *value_index_mutator = 0;
112  TableMutatorAsync *qualifier_index_mutator = 0;
113 
115  value_index_mutator = m_index_mutator;
117  qualifier_index_mutator = m_qualifier_index_mutator;
118 
119  IndexTables::add(key, FLAG_INSERT, vptr, value_len,
120  value_index_mutator, qualifier_index_mutator);
121 
122 }
123 
124 
126  SchemaPtr &schema, bool has_index, bool has_qualifier_index)
127 {
128  TablePtr index_table;
129  TablePtr qualifier_index_table;
130 
131  HT_ASSERT(has_index || has_qualifier_index);
132 
133  lock_guard<mutex> lock(ms_mutex);
134 
135  // check if we've cached Table pointers for the indices
136  if (has_index)
137  index_table = ms_index_cache[table_id];
138  if (has_qualifier_index)
139  qualifier_index_table = ms_qualifier_index_cache[table_id];
140 
141  if ((has_index && index_table)
142  && (has_qualifier_index && qualifier_index_table)) {
143  return std::make_shared<IndexUpdater>(schema, index_table,
144  qualifier_index_table);
145  }
146 
147  // at least one index table was not cached: load it
148  if (!ms_namemap)
149  ms_namemap = make_shared<NameIdMapper>(Global::hyperspace, Global::toplevel_dir);
150 
151  String table_name;
152  if (!ms_namemap->id_to_name(table_id, table_name)) {
153  HT_WARNF("Failed to map table id %s to table name", table_id.c_str());
154  return 0;
155  }
156 
157  if (has_index && !index_table) {
158  String dir = Filesystem::dirname(table_name);
159  String base = Filesystem::basename(table_name);
160  String indexname = dir != "."
161  ? dir + "/^" + base
162  : "^" + base;
163  index_table = load_table(indexname);
164  HT_ASSERT(index_table != 0);
165  ms_index_cache[table_id] = index_table;
166  }
167 
168  if (has_qualifier_index && !qualifier_index_table) {
169  String dir = Filesystem::dirname(table_name);
170  String base = Filesystem::basename(table_name);
171  String indexname = dir != "."
172  ? dir + "/^^" + base
173  : "^^" + base;
174  qualifier_index_table = load_table(indexname);
175  HT_ASSERT(qualifier_index_table != 0);
176  ms_qualifier_index_cache[table_id] = qualifier_index_table;
177  }
178 
179  if (index_table || qualifier_index_table)
180  return std::make_shared<IndexUpdater>(schema, index_table,
181  qualifier_index_table);
182  else
183  return IndexUpdaterPtr(0);
184 }
185 
187 {
188  lock_guard<mutex> lock(ms_mutex);
189 
190  ms_namemap = 0;
191 
192  ms_index_cache.clear();
193  ms_qualifier_index_cache.clear();
194 }
195 
197  lock_guard<mutex> lock(ms_mutex);
198  ms_index_cache.clear();
199  ms_qualifier_index_cache.clear();
200 }
201 
203 {
205  return make_shared<Table>(Config::properties, Global::range_locator,
207  ms_namemap, table_name);
208 }
209 
214 
215 } // namespace Hypertable
216 
static std::mutex mutex
Definition: Logger.cc:43
#define HT_WARNF(msg,...)
Definition: Logger.h:290
void purge(const Key &key, const ByteString &value)
Purges a key from index tables.
Definition: IndexUpdater.cc:77
Abstract base class for a filesystem.
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
static ConnectionManagerPtr conn_manager
Definition: Global.h:113
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
Asynchronous table scanner.
std::shared_ptr< IndexUpdater > IndexUpdaterPtr
Smart pointer to IndexUpdater.
Definition: IndexUpdater.h:87
static void clear_cache()
Clears both value and qualifier caches.
virtual void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)
Callback method for successful scan.
Definition: IndexUpdater.cc:48
static TableMap ms_qualifier_index_cache
Definition: IndexUpdater.h:110
STL namespace.
virtual void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
Definition: IndexUpdater.cc:51
static void close()
Cleanup function; called before leaving main()
void add(const Key &key, uint8_t flag, const void *value, uint32_t value_len, TableMutatorAsync *value_index_mutator, TableMutatorAsync *qualifier_index_mutator)
Definition: IndexTables.cc:34
Declarations for Schema.
std::map< String, TablePtr > TableMap
Definition: IndexUpdater.h:106
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
TableMutatorAsync * m_qualifier_index_mutator
Mutator for qualifier index table.
Definition: IndexUpdater.h:78
static TablePtr load_table(const String &table_name)
virtual void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failedMutations)
Callback method for update errors.
Definition: IndexUpdater.cc:52
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
static Hyperspace::SessionPtr hyperspace
Definition: Global.h:63
static std::string toplevel_dir
Definition: Global.h:108
TableMutatorAsync * m_index_mutator
Mutator for value index table.
Definition: IndexUpdater.h:75
static NameIdMapperPtr ms_namemap
Definition: IndexUpdater.h:109
virtual void scan_error(TableScannerAsync *scanner, int error, const String &error_msg, bool eos)
Callback method for scan errors.
Definition: IndexUpdater.cc:49
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
std::shared_ptr< ApplicationQueueInterface > ApplicationQueueInterfacePtr
Smart pointer to ApplicationQueueInterface.
Declarations for IndexUpdater.
#define HT_ERROR_OUT
Definition: Logger.h:301
static Hypertable::RangeLocatorPtr range_locator
Definition: Global.h:69
static String basename(String name, char separator= '/')
A posix-compliant basename() which strips directory names from a filename.
Definition: Filesystem.cc:154
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
bool m_qualifier_index_map[256]
Definition: IndexUpdater.h:83
Provides access to internal components of opaque key.
Definition: Key.h:40
This is a generic exception class for Hypertable.
Definition: Error.h:314
static String dirname(String name, char separator= '/')
A posix-compliant dirname() which strips the last component from a file name.
Definition: Filesystem.cc:127
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
Configuration settings.
static IndexUpdaterPtr create(const String &table_id, SchemaPtr &schema, bool has_index, bool has_qualifier_index)
Factory function.
ResultCallback * m_cb
Async mutator callback object.
Definition: IndexUpdater.h:81
static Hypertable::ApplicationQueuePtr app_queue
Definition: Global.h:66
Represents an open table.
std::shared_ptr< NameIdMapper > NameIdMapperPtr
Smart pointer to NameIdMapper.
Definition: NameIdMapper.h:121
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43
uint32_t decode_vi32(const uint8_t **bufp, size_t *remainp)
Decode a variable length encoded integer up to 32-bit.
void add(const Key &key, const ByteString &value)
Adds a key to index tables.
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39