0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
RangeServerCommandInterpreter.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 
29 #include <Hypertable/Lib/Key.h>
35 
37 
38 #include <Common/Init.h>
39 #include <Common/Error.h>
40 #include <Common/FileUtils.h>
41 
42 #include <boost/algorithm/string.hpp>
43 #include <boost/progress.hpp>
44 
45 #include <cassert>
46 #include <cstdio>
47 #include <cstring>
48 
49 #define BUFFER_SIZE 65536
50 
51 using namespace Hql;
52 using namespace Hypertable::Config;
53 using namespace Hypertable::Lib;
54 using namespace Hypertable;
55 using namespace Serialization;
56 using namespace Tools::client::rangeserver;
57 using namespace std;
58 
59 namespace {
60 
61  void process_event(EventPtr &event) {
62  ::int32_t error;
63  const ::uint8_t *decode_ptr = event->payload + 4;
64  size_t decode_remain = event->payload_len - 4;
65  ::uint32_t offset, len;
66  if (decode_remain == 0)
67  cout << "success" << endl;
68  else {
69  while (decode_remain) {
70  try {
71  error = decode_i32(&decode_ptr, &decode_remain);
72  offset = decode_i32(&decode_ptr, &decode_remain);
73  len = decode_i32(&decode_ptr, &decode_remain);
74  }
75  catch (Exception &e) {
76  HT_ERROR_OUT << e << HT_END;
77  break;
78  }
79  cout << "rejected: offset=" << offset << " span=" << len << " "
80  << Error::get_text(error) << endl;
81  }
82  }
83  }
84 
85 }
86 
87 RangeServerCommandInterpreter::RangeServerCommandInterpreter(
88  Hyperspace::SessionPtr &hyperspace, const sockaddr_in addr,
89  RangeServer::ClientPtr &range_server)
90  : m_hyperspace(hyperspace), m_addr(addr),
91  m_range_server(range_server), m_cur_scanner_id(-1) {
92  HqlHelpText::install_range_server_client_text();
93  if (m_hyperspace) {
94  m_toplevel_dir = properties->get_str("Hypertable.Directory");
95  boost::trim_if(m_toplevel_dir, boost::is_any_of("/"));
97  m_namemap = make_shared<NameIdMapper>(m_hyperspace, m_toplevel_dir);
98  }
99  return;
100 }
101 
102 
104  TableIdentifier *table = 0;
105  RangeSpec range;
106  TableInfo *table_info;
107  String schema_str;
108  String out_str;
109  SchemaPtr schema;
110  Hql::ParserState state;
111  Hql::Parser parser(state);
112  parse_info<> info;
113  DispatchHandlerSynchronizer sync_handler;
114  ScanBlock scanblock;
115  ::int32_t scanner_id;
116  EventPtr event;
117 
118  info = parse(line.c_str(), parser, space_p);
119 
120  if (info.full) {
121 
122  // if table name specified, get associated objects
123  if (state.table_name != "") {
124  table_info = m_table_map[state.table_name];
125  if (table_info == 0) {
126  bool is_namespace = false;
127  String table_id;
128  if (!m_hyperspace)
129  HT_FATALF("Hyperspace is required to execute: %s", line.c_str());
130  if (!m_namemap->name_to_id(state.table_name, table_id, &is_namespace) ||
131  is_namespace)
133  table_info = new TableInfo(m_toplevel_dir, table_id);
134  table_info->load(m_hyperspace);
135  m_table_map[state.table_name] = table_info;
136  }
137  table = table_info->get_table_identifier();
138  table_info->get_schema_ptr(schema);
139  }
140 
141  // if end row is "??", transform it to 0xff 0xff
142  if (state.range_end_row == "??")
143  state.range_end_row = Key::END_ROW_MARKER;
144 
145  if (state.command == COMMAND_STATUS) {
146  Status status;
147  try {
148  m_range_server->status(m_addr, status);
149  }
150  catch (Exception &e) {
151  status.set(Status::Code::CRITICAL,
152  Hypertable::format("%s (%s)", Error::get_text(e.code()), e.what()));
153  }
154  string output;
155  Status::Code code;
156  status.get(&code, output);
157  if (!m_silent) {
158  cout << "RangeServer " << Status::code_to_string(code);
159  if (!output.empty())
160  cout << " - " << output;
161  cout << endl;
162  }
163  return static_cast<int>(code);
164  }
165  else if (state.command == COMMAND_LOAD_RANGE) {
166  RangeState range_state;
167 
168  cout << "TableName = " << state.table_name << endl;
169  cout << "StartRow = " << state.range_start_row << endl;
170  cout << "EndRow = " << state.range_end_row << endl;
171 
172  range.start_row = state.range_start_row.c_str();
173  range.end_row = state.range_end_row.c_str();
174 
175  range_state.soft_limit = 200000000LL;
176 
177  m_range_server->load_range(m_addr, *table, range, range_state, false);
178 
179  QualifiedRangeSpec qrs(*table, range);
180  vector<QualifiedRangeSpec *> range_vec;
181  map<QualifiedRangeSpec, int> response_map;
182  range_vec.push_back(&qrs);
183  m_range_server->acknowledge_load(m_addr, range_vec, response_map);
184  map<QualifiedRangeSpec, int>::iterator it = response_map.begin();
185  if (it->second != Error::OK)
186  HT_THROW(it->second, "Problem acknowledging load range");
187  }
188  else if (state.command == COMMAND_UPDATE) {
189  TestSource *tsource = 0;
190 
191  if (!FileUtils::exists(state.input_file.c_str()))
192  HT_THROW(Error::FILE_NOT_FOUND, String("Input file '")
193  + state.input_file + "' does not exist");
194 
195  tsource = new TestSource(state.input_file, schema.get());
196 
197  ::uint8_t *send_buf = 0;
198  size_t send_buf_len = 0;
200  SerializedKey key;
201  ByteString value;
202  size_t key_len, value_len;
203  ::uint32_t send_count = 0;
204  bool outstanding = false;
205 
206  while (true) {
207 
208  while (tsource->next(key, value)) {
209  key_len = key.length();
210  value_len = value.length();
211  buf.ensure(key_len + value_len);
212  buf.add_unchecked(key.ptr, key_len);
213  buf.add_unchecked(value.ptr, value_len);
214  if (buf.fill() > BUFFER_SIZE)
215  break;
216  }
217 
221  if (buf.fill()) {
222  std::vector<SerializedKey> keys;
223  const ::uint8_t *ptr;
224  size_t len;
225 
226  key.ptr = ptr = buf.base;
227 
228  while (ptr < buf.ptr) {
229  keys.push_back(key);
230  key.next();
231  key.next();
232  ptr = key.ptr;
233  }
234 
235  std::sort(keys.begin(), keys.end());
236 
237  send_buf = new ::uint8_t [buf.fill()];
238  ::uint8_t *sendp = send_buf;
239  for (size_t i=0; i<keys.size(); i++) {
240  key = keys[i];
241  key.next();
242  key.next();
243  len = key.ptr - keys[i].ptr;
244  memcpy(sendp, keys[i].ptr, len);
245  sendp += len;
246  }
247  send_buf_len = sendp - send_buf;
248  buf.clear();
249  send_count = keys.size();
250  }
251  else {
252  send_buf_len = 0;
253  send_count = 0;
254  }
255 
256  if (outstanding) {
257  if (!sync_handler.wait_for_reply(event))
258  HT_THROW(Protocol::response_code(event),
259  (Protocol::string_format_message(event)));
260  process_event(event);
261  }
262 
263  outstanding = false;
264 
265  if (send_buf_len > 0) {
266  StaticBuffer mybuf(send_buf, send_buf_len);
267  m_range_server->update(m_addr, ClusterId::get(), *table, send_count,
268  mybuf, 0, &sync_handler);
269  outstanding = true;
270  }
271  else
272  break;
273  }
274 
275  if (outstanding) {
276  if (!sync_handler.wait_for_reply(event))
277  HT_THROW(Protocol::response_code(event),
278  (Protocol::string_format_message(event)));
279  process_event(event);
280  }
281 
282  }
283  else if (state.command == COMMAND_CREATE_SCANNER) {
284  range.start_row = state.range_start_row.c_str();
285  range.end_row = state.range_end_row.c_str();
286  m_range_server->create_scanner(m_addr, *table, range,
287  state.scan.builder.get(), scanblock);
288  m_cur_scanner_id = scanblock.get_scanner_id();
289 
290  SerializedKey key;
291  ByteString value;
292 
293  while (scanblock.next(key, value))
294  display_scan_data(key, value, schema);
295 
296  if (scanblock.eos())
297  m_cur_scanner_id = -1;
298 
299  }
300  else if (state.command == COMMAND_FETCH_SCANBLOCK) {
301 
302  if (state.scanner_id == -1) {
303  if (m_cur_scanner_id == -1)
305  "No currently open scanner");
306  scanner_id = m_cur_scanner_id;
307  m_cur_scanner_id = -1;
308  }
309  else
310  scanner_id = state.scanner_id;
311 
314  m_range_server->fetch_scanblock(m_addr, scanner_id, scanblock);
315 
316  SerializedKey key;
317  ByteString value;
318 
319  while (scanblock.next(key, value))
320  display_scan_data(key, value, schema);
321 
322  if (scanblock.eos())
323  m_cur_scanner_id = -1;
324 
325  }
326  else if (state.command == COMMAND_DESTROY_SCANNER) {
327 
328  if (state.scanner_id == -1) {
329  if (m_cur_scanner_id == -1)
330  return 0;
331  scanner_id = m_cur_scanner_id;
332  m_cur_scanner_id = -1;
333  }
334  else
335  scanner_id = state.scanner_id;
336 
337  m_range_server->destroy_scanner(m_addr, scanner_id);
338 
339  }
340  else if (state.command == COMMAND_DROP_RANGE) {
341 
342  range.start_row = state.range_start_row.c_str();
343  range.end_row = state.range_end_row.c_str();
344 
345  m_range_server->drop_range(m_addr, *table, range, &sync_handler);
346 
347  if (!sync_handler.wait_for_reply(event))
348  HT_THROW(Protocol::response_code(event),
349  (Protocol::string_format_message(event)));
350 
351  }
352  else if (state.command == COMMAND_DUMP) {
353  m_range_server->dump(m_addr, state.output_file, state.nokeys);
354  cout << "success" << endl;
355  }
356  else if (state.command == COMMAND_DUMP_PSEUDO_TABLE) {
357  m_range_server->dump_pseudo_table(m_addr, *table, state.pseudo_table_name, state.output_file);
358  cout << "success" << endl;
359  }
360  else if (state.command == COMMAND_HELP) {
361  const char **text = HqlHelpText::get(state.str);
362  if (text) {
363  for (size_t i=0; text[i]; i++)
364  cout << text[i] << endl;
365  }
366  else
367  cout << endl << "no help for '" << state.str << "'" << endl << endl;
368  }
369  else if (state.command == COMMAND_WAIT_FOR_MAINTENANCE) {
370  m_range_server->wait_for_maintenance(m_addr);
371  }
372  else if (state.command == COMMAND_SHUTDOWN) {
373  m_range_server->shutdown(m_addr);
374  }
375  else if (state.command == COMMAND_HEAPCHECK) {
376  m_range_server->heapcheck(m_addr, state.output_file);
377  }
378  else if (state.command == COMMAND_COMPACT) {
379  if (table)
380  m_range_server->compact(m_addr, *table, state.str, 0);
381  else {
382  TableIdentifier empty_table;
383  HT_ASSERT(state.flags);
384  m_range_server->compact(m_addr, empty_table, "", state.flags);
385  }
386  }
387  else
389  Hypertable::format("unsupported command: %d", state.command));
390  }
391  else
392  HT_THROW(Error::HQL_PARSE_ERROR, String("parse error at: ") + info.stop);
393  return 0;
394 }
395 
396 
397 
401 void
403  const ByteString &value,
404  SchemaPtr &schema) {
405  Key key(serkey);
406  ColumnFamilySpec *cf_spec;
407 
408  if (key.flag == FLAG_DELETE_ROW) {
409  cout << key.timestamp << " " << key.row << " DELETE" << endl;
410  }
411  else if (key.flag == FLAG_DELETE_COLUMN_FAMILY) {
412  cf_spec = schema->get_column_family(key.column_family_code);
413  cout << key.timestamp << " " << key.row << " " << cf_spec->get_name() << " DELETE"
414  << endl;
415  }
416  else {
417  if (key.column_family_code > 0) {
418  cf_spec = schema->get_column_family(key.column_family_code);
419  if (key.flag == FLAG_DELETE_CELL)
420  cout << key.timestamp << " " << key.row << " " << cf_spec->get_name() << ":"
421  << key.column_qualifier << " DELETE" << endl;
422  else {
423  cout << key.timestamp << " " << key.row << " " << cf_spec->get_name() << ":"
424  << key.column_qualifier;
425  cout << endl;
426  }
427  }
428  else {
429  cerr << "Bad column family (" << (int)key.column_family_code
430  << ") for row key " << key.row;
431  cerr << endl;
432  }
433  }
434 }
A memory buffer of static size.
Definition: StaticBuffer.h:45
int get_scanner_id()
Returns scanner ID associated with this scanblock.
Definition: ScanBlock.h:109
int64_t timestamp
Definition: Key.h:134
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
const char * row
Definition: Key.h:129
Range specification.
Definition: RangeSpec.h:40
Holds Nagios-style program status information.
Definition: Status.h:42
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
Column family specification.
bool eos()
Returns true if this is the final scanblock returned by the scanner.
Definition: ScanBlock.h:85
uint64_t soft_limit
Soft split size limit.
Definition: RangeState.h:108
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
void set(Code code, const std::string &text)
Sets status code and text.
Definition: Status.h:101
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
Code
Enumeration for status codes.
Definition: Status.h:47
void display_scan_data(const SerializedKey &key, const ByteString &value, SchemaPtr &schema_ptr)
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
uint32_t decode_i32(const uint8_t **bufp, size_t *remainp)
Decode a 32-bit integer in little-endian order.
ScanSpecBuilder builder
Definition: HqlParser.h:266
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
#define HT_ASSERT(_e_)
Definition: Logger.h:396
#define BUFFER_SIZE
Declarations for RangeServerProtocol.
File system utility functions.
const char * end_row
Definition: RangeSpec.h:60
Declarations for RangeState.
std::shared_ptr< Session > SessionPtr
Definition: Session.h:734
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:594
void get_schema_ptr(SchemaPtr &schema)
Definition: TableInfo.h:43
Compatibility Macros for C/C++.
Initialization helper for applications.
#define HT_END
Definition: Logger.h:220
T get(const String &name)
Retrieves a configuration value.
Definition: Config.h:82
size_t length() const
Retrieves the length of the serialized string.
Definition: ByteString.h:62
#define HT_ERROR_OUT
Definition: Logger.h:301
Hypertable library.
Definition: CellInterval.h:30
const uint8_t * ptr
The pointer to the serialized data.
Definition: ByteString.h:121
Hypertable definitions
Encapsulates a block of scan results.
Definition: ScanBlock.h:50
#define HT_FATALF(msg,...)
Definition: Logger.h:343
const std::string & get_name() const
Gets column family name.
bool next(SerializedKey &key, ByteString &value)
Returns the next key/value pair in the scanblock.
Definition: ScanBlock.cc:84
DispatchHandler class used to synchronize with response messages.
const char * start_row
Definition: RangeSpec.h:59
void clear()
Clears the buffer.
Provides access to internal components of opaque key.
Definition: Key.h:40
uint8_t * base
Pointer to the allocated memory buffer.
void get(Code *code, std::string &text) const
Gets status code and text.
Definition: Status.h:111
size_t fill() const
Returns the size of the used portion.
Definition: DynamicBuffer.h:70
TableIdentifier * get_table_identifier()
Definition: TableInfo.h:42
This is a generic exception class for Hypertable.
Definition: Error.h:314
Declarations for ScanBlock.
Qualified (with table identifier) range specification.
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
uint8_t column_family_code
Definition: Key.h:127
Range state.
Definition: RangeState.h:48
uint8_t flag
Definition: Key.h:125
Declarations for HqlHelpText.
void load(Hyperspace::SessionPtr &hyperspace)
Definition: TableInfo.cc:45
const char * column_qualifier
Definition: Key.h:130
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
void ensure(size_t len)
Ensure space for additional data Will grow the space to 1.5 of the needed space with existing data un...
Definition: DynamicBuffer.h:82
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
Declarations for ClusterId.
Declarations for DispatchHandlerSynchronizer.
const char * code_to_string(int var_code)
Converts variable code to variable string.
uint8_t * next()
Retrieves the next serialized String in the buffer.
Definition: ByteString.h:71
int code() const
Returns the error code.
Definition: Error.h:391
bool next(ByteString &key, ByteString &value)
Definition: TestSource.cc:39