0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
LoadClient.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "LoadClient.h"
25 
26 #ifdef HT_WITH_THRIFT
27 #include <ThriftBroker/Client.h>
28 #include <ThriftBroker/Config.h>
29 #endif
30 
31 using namespace std;
32 
33 LoadClient::LoadClient(const String &config_file, bool thrift)
34  : m_thrift(thrift), m_native_client(0), m_ns(0), m_native_table(0),
35  m_native_table_open(false), m_native_mutator(0), m_native_scanner(0) {
36 #ifdef HT_WITH_THRIFT
37  m_thrift_namespace = 0;
38  m_thrift_mutator = 0;
39  m_thrift_scanner = 0;
40 #endif
41 
42  if (m_thrift) {
43 #ifdef HT_WITH_THRIFT
44  m_thrift_client.reset(new Thrift::Client("localhost", 15867));
45  m_thrift_namespace = m_thrift_client->open_namespace("/");
46 #else
47  HT_FATAL("Thrift support not installed");
48 #endif
49  }
50  else {
51  m_native_client = make_shared<Hypertable::Client>(config_file);
52  m_ns = m_native_client->open_namespace("/");
53  }
54 }
55 
57  : m_thrift(thrift), m_native_client(0), m_ns(0), m_native_table(0),
58  m_native_table_open(false), m_native_mutator(0), m_native_scanner(0) {
59 #ifdef HT_WITH_THRIFT
60  m_thrift_namespace = 0;
61  m_thrift_mutator = 0;
62  m_thrift_scanner = 0;
63 #endif
64 
65  if (m_thrift) {
66 #ifdef HT_WITH_THRIFT
67  m_thrift_client.reset(new Thrift::Client("localhost", 15867));
68  m_thrift_namespace = m_thrift_client->open_namespace("/");
69 #else
70  HT_FATAL("Thrift support not installed");
71 #endif
72  }
73  else {
74  m_native_client = make_shared<Hypertable::Client>();
75  m_ns = m_native_client->open_namespace("/");
76  }
77 }
78 
79 void
80 LoadClient::create_mutator(const String &tablename, int mutator_flags,
81  ::uint64_t shared_mutator_flush_interval)
82 {
83  if (m_thrift) {
84 #ifdef HT_WITH_THRIFT
85  m_thrift_mutator = m_thrift_client->open_mutator(m_thrift_namespace,
86  tablename, mutator_flags, 0);
87 #endif
88  }
89  else {
90  if (!m_native_table_open) {
91  m_native_table = m_ns->open_table(tablename);
92  m_native_table_open = true;
93  }
94  m_native_mutator.reset(m_native_table->create_mutator(0, mutator_flags,
95  shared_mutator_flush_interval));
96  }
97 }
98 
99 void
101 {
102  if (m_thrift) {
103 #ifdef HT_WITH_THRIFT
104  vector<ThriftGen::Cell> thrift_cells;
105  for (const auto &cell : cells) {
106  thrift_cells.push_back(ThriftGen::make_cell((const char*)cell.row_key,
107  (const char*)cell.column_family,(const char*)cell.column_qualifier,
108  string((const char*)cell.value, cell.value_len), cell.timestamp,
109  cell.revision, (ThriftGen::KeyFlag::type) cell.flag));
110  }
111  m_thrift_client->mutator_set_cells(m_thrift_mutator, thrift_cells);
112 #endif
113  }
114  else {
115  m_native_mutator->set_cells(cells);
116  }
117 }
118 
119 void
121  if (m_thrift) {
122 #ifdef HT_WITH_THRIFT
123  vector<ThriftGen::Cell> thrift_cells;
124  ThriftGen::KeyFlag::type flag = ThriftGen::KeyFlag::INSERT;
125 
126  if (key.column_family == 0 || *key.column_family == '\0')
127  flag = ThriftGen::KeyFlag::DELETE_ROW;
128  if (key.column_qualifier == 0 || *key.column_qualifier == '\0')
129  flag = ThriftGen::KeyFlag::DELETE_CF;
130  else
131  flag = ThriftGen::KeyFlag::DELETE_CELL;
132 
133  thrift_cells.push_back(ThriftGen::make_cell((const char *)key.row,
134  key.column_family, key.column_qualifier, std::string(""),
135  key.timestamp, key.revision, flag));
136 
137  m_thrift_client->mutator_set_cells(m_thrift_mutator, thrift_cells);
138 #endif
139  }
140  else {
141  m_native_mutator->set_delete(key);
142  }
143 }
144 
145 void
147 {
148  if (m_thrift) {
149 #ifdef HT_WITH_THRIFT
150  m_thrift_client->flush_mutator(m_thrift_mutator);
151 #endif
152  }
153  else {
154  m_native_mutator->flush();
155  }
156 }
157 
158 void
159 LoadClient::create_scanner(const String &tablename, const ScanSpec &scan_spec)
160 {
161  if (m_thrift) {
162 #ifdef HT_WITH_THRIFT
163  //copy scanspec column and first row interval
164  ThriftGen::ScanSpec thrift_scan_spec;
165  ThriftGen::RowInterval thrift_row_interval;
166  thrift_row_interval.start_row = scan_spec.row_intervals[0].start;
167  thrift_row_interval.end_row = scan_spec.row_intervals[0].end;
168  thrift_row_interval.start_inclusive = scan_spec.row_intervals[0].start_inclusive;
169  thrift_row_interval.end_inclusive = scan_spec.row_intervals[0].end_inclusive;
170  thrift_row_interval.__isset.start_row = thrift_row_interval.__isset.end_row = true;
171  thrift_row_interval.__isset.start_inclusive = thrift_row_interval.__isset.end_inclusive = true;
172 
173  thrift_scan_spec.columns.push_back(scan_spec.columns[0]);
174  thrift_scan_spec.row_intervals.push_back(thrift_row_interval);
175  thrift_scan_spec.__isset.columns = thrift_scan_spec.__isset.row_intervals = true;
176 
177  m_thrift_scanner = m_thrift_client->open_scanner(m_thrift_namespace,
178  tablename, thrift_scan_spec);
179 #endif
180  }
181  else {
182  if (!m_native_table_open) {
183  m_native_table = m_ns->open_table(tablename);
184  m_native_table_open = true;
185  }
186  m_native_scanner.reset(m_native_table->create_scanner(scan_spec));
187  }
188 }
189 
190 uint64_t
192 {
193  uint64_t bytes_scanned = 0;
194  if (m_thrift) {
195 #ifdef HT_WITH_THRIFT
196  vector<ThriftGen::Cell> cells;
197 
198  do {
199  m_thrift_client->next_cells(cells, m_thrift_scanner);
200  for (const auto &cell : cells) {
201  bytes_scanned += cell.key.row.size() + cell.key.column_family.size()
202  + cell.key.column_qualifier.size() + 8 + 8 + 2
203  + cell.value.size();
204  }
205  } while (cells.size());
206 #endif
207  return bytes_scanned;
208  }
209  else {
210  Cell cell;
211  // see issue #802 why we can#t return m_native_scanner->bytes_scanned()
212  while (m_native_scanner->next(cell)) {
213  bytes_scanned += strlen(cell.row_key) + strlen(cell.column_family) +
214  8 + 8 + 2 + cell.value_len;
215  if (cell.column_qualifier)
216  bytes_scanned += strlen(cell.column_qualifier);
217  }
218  return bytes_scanned;
219  }
220 }
221 
222 void
224 {
225  if (m_thrift) {
226 #ifdef HT_WITH_THRIFT
227  m_thrift_client->close_scanner(m_thrift_scanner);
228 #endif
229  }
230  else {
231  m_native_scanner = 0;
232  }
233 }
234 
236 {
237  if (m_thrift) {
238 #ifdef HT_WITH_THRIFT
239  m_thrift_client->close_namespace(m_thrift_namespace);
240 #endif
241  }
242 }
int64_t timestamp
Definition: KeySpec.h:130
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
bool m_thrift
Definition: LoadClient.h:77
void set_cells(const Cells &cells)
Definition: LoadClient.cc:100
uint64_t get_all_cells()
Get all cells that match the spec in the current scanner return the total number of bytes scanned...
Definition: LoadClient.cc:191
const char * column_qualifier
Definition: KeySpec.h:128
void close_scanner()
Definition: LoadClient.cc:223
const char * column_qualifier
Definition: Cell.h:68
int64_t revision
Definition: KeySpec.h:131
STL namespace.
#define HT_FATAL(msg)
Definition: Logger.h:339
LoadClient(const String &config_file, bool thrift=false)
Definition: LoadClient.cc:33
const void * row
Definition: KeySpec.h:125
NamespacePtr m_ns
Definition: LoadClient.h:79
void set_delete(const KeySpec &key)
Definition: LoadClient.cc:120
Compatibility Macros for C/C++.
bool m_native_table_open
Definition: LoadClient.h:81
const char * row_key
Definition: Cell.h:66
TablePtr m_native_table
Definition: LoadClient.h:80
Cell make_cell(const char *row, const char *cf, const char *cq, const std::string &value, int64_t ts, int64_t rev, KeyFlag::type flag)
Definition: ThriftHelper.cc:63
void flush()
Definition: LoadClient.cc:146
ClientPtr m_native_client
Definition: LoadClient.h:78
TableMutatorPtr m_native_mutator
Definition: LoadClient.h:82
const char * column_family
Definition: Cell.h:67
uint32_t value_len
Definition: Cell.h:72
void create_mutator(const String &tablename, int mutator_flags,::uint64_t shared_mutator_flush_interval)
Definition: LoadClient.cc:80
TableScannerPtr m_native_scanner
Definition: LoadClient.h:83
Encapsulates decomposed key and value.
Definition: Cell.h:32
void create_scanner(const String &tablename, const ScanSpec &scan_spec)
Create a scanner.
Definition: LoadClient.cc:159
const char * column_family
Definition: KeySpec.h:127
A client for the ThriftBroker.
Definition: Client.h:59