0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
HqlInterpreter.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "HqlInterpreter.h"
25 
26 #include <Hypertable/Lib/Client.h>
29 #include <Hypertable/Lib/Key.h>
36 #include <Hypertable/Lib/Schema.h>
38 
40 
41 #include <Common/Config.h>
42 #include <Common/Error.h>
43 #include <Common/FileUtils.h>
44 #include <Common/ScopeGuard.h>
45 #include <Common/Status.h>
46 #include <Common/Stopwatch.h>
47 #include <Common/String.h>
48 
49 #include <boost/algorithm/string.hpp>
50 #include <boost/iostreams/device/file_descriptor.hpp>
51 #include <boost/iostreams/filtering_stream.hpp>
52 #include <boost/iostreams/filter/gzip.hpp>
53 #include <boost/iostreams/device/null.hpp>
54 
55 #include <cstdio>
56 #include <cstring>
57 #include <ctime>
58 #include <sstream>
59 #include <iostream>
60 
61 using namespace std;
62 using namespace Hypertable;
63 using namespace Hql;
64 
65 namespace {
66 
67 void close_file(int fd) {
68  if (fd >= 0)
69  close(fd);
70 }
71 
72 int cmd_help(ParserState &state, HqlInterpreter::Callback &cb) {
73  const char **text = HqlHelpText::get(state.str);
74 
75  if (text) {
76  for (; *text; ++text)
77  cb.on_return(*text);
78  }
79  else
80  cb.on_return("\nno help for '" + state.str + "'");
81  return 0;
82 }
83 
84 int
85 cmd_create_namespace(Client *client, NamespacePtr &ns, ParserState &state,
87 
88  if (!ns || state.ns.find('/') == 0)
89  client->create_namespace(state.ns, NULL, false, state.if_exists);
90  else
91  client->create_namespace(state.ns, ns.get(), false, state.if_exists);
92  cb.on_finish();
93  return 0;
94 }
95 
96 int
97 cmd_use_namespace(Client *client, NamespacePtr &ns, bool immutable_namespace,
99 
100  if (ns && immutable_namespace)
101  HT_THROW(Error::BAD_NAMESPACE, (String)"Attempting to modify immutable namespace " +
102  ns->get_name() + " to " + state.ns);
103 
104  NamespacePtr tmp_ns;
105  if (!ns || state.ns.find('/') == 0)
106  tmp_ns = client->open_namespace(state.ns);
107  else
108  tmp_ns = client->open_namespace(state.ns, ns.get());
109  if (tmp_ns)
110  ns = tmp_ns;
111  else
112  HT_THROW(Error::NAMESPACE_DOES_NOT_EXIST, (String)"Couldn't open namespace " + state.ns);
113 
114  return 0;
115 }
116 
117 int
118 cmd_drop_namespace(Client *client, NamespacePtr &ns, ParserState &state,
120 
121  if (!ns || state.ns.find('/') == 0 )
122  client->drop_namespace(state.ns, NULL, state.if_exists);
123  else
124  client->drop_namespace(state.ns, ns.get(), state.if_exists);
125  cb.on_finish();
126  return 0;
127 }
128 
129 int
130 cmd_rename_table(NamespacePtr &ns, ParserState &state, HqlInterpreter::Callback &cb) {
131  if (!ns)
132  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
133 
134  ns->rename_table(state.table_name, state.new_table_name);
135  cb.on_finish();
136  return 0;
137 }
138 
139 int
140 cmd_exists_table(NamespacePtr &ns, ParserState &state, HqlInterpreter::Callback &cb) {
141  string exists = (String)"true";
142 
143  if (!ns)
144  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
145 
146  if (!ns->exists_table(state.table_name))
147  exists = (String)"false";
148  cb.on_return(exists);
149  cb.on_finish();
150  return 0;
151 }
152 
153 int
154 cmd_show_create_table(NamespacePtr &ns, ParserState &state,
156  if (!ns)
157  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
158  string schema_str = ns->get_schema_str(state.table_name, true);
159  SchemaPtr schema( Schema::new_instance(schema_str) );
160  string out_str = schema->render_hql(state.table_name);
161  out_str += ";\n";
162  cb.on_return(out_str);
163  cb.on_finish();
164  return 0;
165 }
166 
167 
168 int
169 cmd_create_table(NamespacePtr &ns, ParserState &state,
171  if (!state.input_file.empty()) {
172  if (state.input_file_src != LOCAL_FILE)
173  HT_THROW(Error::SYNTAX_ERROR, "Schema file must reside in local FS");
174  string schema_str;
175  if (!FileUtils::read(state.input_file, schema_str))
177  state.create_schema.reset(Schema::new_instance(schema_str));
178  }
179  ns->create_table(state.table_name, state.create_schema);
180  cb.on_finish();
181  return 0;
182 }
183 
184 int
185 cmd_alter_table(NamespacePtr &ns, ParserState &state,
187  bool force {};
188 
189  if (!ns)
190  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
191 
192  if (!state.input_file.empty()) {
193  if (state.input_file_src != LOCAL_FILE)
194  HT_THROW(Error::SYNTAX_ERROR, "Schema file must reside in local FS");
195  string schema_str;
196  if (!FileUtils::read(state.input_file, schema_str))
198  state.alter_schema.reset(Schema::new_instance(schema_str));
199  force = true;
200  }
201 
202  ns->alter_table(state.table_name, state.alter_schema, force);
203  ns->refresh_table(state.table_name);
204  cb.on_finish();
205  return 0;
206 }
207 
208 int
209 cmd_compact(NamespacePtr &ns, ParserState &state,
211  if (!ns)
212  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
213  ns->compact(state.table_name, state.str, state.flags);
214  cb.on_finish();
215  return 0;
216 }
217 
218 
219 int
220 cmd_describe_table(NamespacePtr &ns, ParserState &state,
222  if (!ns)
223  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
224  string schema_str = ns->get_schema_str(state.table_name, state.with_ids);
225  cb.on_return(schema_str);
226  cb.on_finish();
227  return 0;
228 }
229 
230 int
231 cmd_select(NamespacePtr &ns, ConnectionManagerPtr &conn_manager,
233  if (!ns)
234  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
235  TablePtr table;
236  boost::iostreams::filtering_ostream fout;
237  FILE *outf = cb.output;
238  int out_fd = -1;
239  string localfs = "file://";
240  char fs = state.field_separator ? state.field_separator : '\t';
241 
242  table = ns->open_table(state.table_name);
243  TableScannerPtr scanner( table->create_scanner(state.scan.builder.get(), 0, true) );
244 
245  // whether it's select into file
246  if (!state.scan.outfile.empty()) {
247  FileUtils::expand_tilde(state.scan.outfile);
248 
249  if (boost::algorithm::ends_with(state.scan.outfile, ".gz"))
250  fout.push(boost::iostreams::gzip_compressor());
251 
252  if (boost::algorithm::starts_with(state.scan.outfile, "dfs://") ||
253  boost::algorithm::starts_with(state.scan.outfile, "fs://")) {
254  // init Fs client if not done yet
255  if (!fs_client)
256  fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager, Config::properties);
257  if (boost::algorithm::starts_with(state.scan.outfile, "dfs://"))
258  fout.push(FsBroker::Lib::FileSink(fs_client, state.scan.outfile.substr(6)));
259  else
260  fout.push(FsBroker::Lib::FileSink(fs_client, state.scan.outfile.substr(5)));
261  }
262  else if (boost::algorithm::starts_with(state.scan.outfile, localfs))
263  fout.push(boost::iostreams::file_descriptor_sink(state.scan.outfile.substr(localfs.size())));
264  else
265  fout.push(boost::iostreams::file_descriptor_sink(state.scan.outfile));
266 
267  fout << "#";
268  if (state.scan.display_revisions)
269  fout << "revision" << fs;
270  if (state.scan.display_timestamps)
271  fout << "timestamp" << fs;
272  if (state.scan.keys_only)
273  fout << "row\n";
274  else
275  fout << "row" << fs << "column" << fs << "value\n";
276  }
277  else if (!outf) {
278  cb.on_scan(scanner);
279  return 0;
280  }
281  else {
282  out_fd = dup(fileno(outf));
283  fout.push(boost::iostreams::file_descriptor_sink(out_fd));
284  }
285 
286  HT_ON_SCOPE_EXIT(&close_file, out_fd);
287  Cell cell;
288  ::uint32_t nsec;
289  time_t unix_time;
290  struct tm tms;
291  LoadDataEscape row_escaper;
292  LoadDataEscape escaper;
293  const char *unescaped_buf, *row_unescaped_buf;
294  size_t unescaped_len, row_unescaped_len;
295 
296  if (fs != '\t') {
297  row_escaper.set_field_separator(fs);
298  escaper.set_field_separator(fs);
299  }
300 
301  while (scanner->next(cell)) {
302  if (cb.normal_mode) {
303  // do some stats
304  ++cb.total_cells;
305  cb.total_keys_size += strlen(cell.row_key);
306 
307  if (cell.column_family && cell.column_qualifier)
308  cb.total_keys_size += strlen(cell.column_qualifier) + 1;
309 
310  cb.total_values_size += cell.value_len;
311  }
312  if (state.scan.display_revisions) {
313  if (cb.format_ts_in_nanos)
314  fout << cell.revision << fs;
315  else {
316  nsec = cell.revision % 1000000000LL;
317  unix_time = cell.revision / 1000000000LL;
318  localtime_r(&unix_time, &tms);
319  fout << Hypertable::format("%d-%02d-%02d %02d:%02d:%02d.%09d%c",
320  tms.tm_year + 1900, tms.tm_mon + 1, tms.tm_mday,
321  tms.tm_hour, tms.tm_min, tms.tm_sec, nsec, fs);
322  }
323  }
324  if (state.scan.display_timestamps) {
325  if (cb.format_ts_in_nanos)
326  fout << cell.timestamp << fs;
327  else {
328  nsec = cell.timestamp % 1000000000LL;
329  unix_time = cell.timestamp / 1000000000LL;
330  localtime_r(&unix_time, &tms);
331  fout << Hypertable::format("%d-%02d-%02d %02d:%02d:%02d.%09d%c",
332  tms.tm_year + 1900, tms.tm_mon + 1, tms.tm_mday,
333  tms.tm_hour, tms.tm_min, tms.tm_sec, nsec, fs);
334  }
335  }
336  if (state.escape)
337  row_escaper.escape(cell.row_key, strlen(cell.row_key),
338  &row_unescaped_buf, &row_unescaped_len);
339  else
340  row_unescaped_buf = cell.row_key;
341  if (!state.scan.keys_only) {
342  if (cell.column_family && *cell.column_family) {
343  fout << row_unescaped_buf << fs << cell.column_family;
344  if (cell.column_qualifier && *cell.column_qualifier) {
345  if (state.escape)
346  escaper.escape(cell.column_qualifier, strlen(cell.column_qualifier),
347  &unescaped_buf, &unescaped_len);
348  else
349  unescaped_buf = cell.column_qualifier;
350  fout << ":" << unescaped_buf;
351  }
352 
353  }
354  else
355  fout << row_unescaped_buf;
356 
357  if (state.escape)
358  escaper.escape((const char *)cell.value, (size_t)cell.value_len,
359  &unescaped_buf, &unescaped_len);
360  else {
361  unescaped_buf = (const char *)cell.value;
362  unescaped_len = (size_t)cell.value_len;
363  }
364 
365  fout << fs;
366  switch(cell.flag) {
367  case FLAG_INSERT:
368  fout.write(unescaped_buf, unescaped_len);
369  fout << "\n";
370  break;
371  case FLAG_DELETE_ROW:
372  fout << fs << "DELETE ROW\n";
373  break;
375  fout.write(unescaped_buf, unescaped_len);
376  fout << fs << "DELETE COLUMN FAMILY\n";
377  break;
378  case FLAG_DELETE_CELL:
379  fout.write(unescaped_buf, unescaped_len);
380  fout << fs << "DELETE CELL\n";
381  break;
383  fout.write(unescaped_buf, unescaped_len);
384  fout << fs << "DELETE CELL VERSION\n";
385  break;
386  default:
387  fout << fs << "BAD KEY FLAG\n";
388  }
389  }
390  else
391  fout << row_unescaped_buf << "\n";
392  }
393 
394  fout.strict_sync();
395 
396  cb.on_finish(scanner);
397  return 0;
398 }
399 
400 
401 int
402 cmd_dump_table(NamespacePtr &ns,
403  ConnectionManagerPtr &conn_manager, FsBroker::Lib::ClientPtr &fs_client,
405  if (!ns)
406  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
407  TablePtr table;
408  boost::iostreams::filtering_ostream fout;
409  FILE *outf = cb.output;
410  int out_fd = -1;
411  string localfs = "file://";
412  char fs = state.field_separator ? state.field_separator : '\t';
413 
414  // verify parameters
415 
416  TableDumperPtr dumper = make_shared<TableDumper>(ns, state.table_name, state.scan.builder.get());
417 
418  // whether it's select into file
419  if (!state.scan.outfile.empty()) {
420  FileUtils::expand_tilde(state.scan.outfile);
421 
422  if (boost::algorithm::ends_with(state.scan.outfile, ".gz"))
423  fout.push(boost::iostreams::gzip_compressor());
424 
425  if (boost::algorithm::starts_with(state.scan.outfile, "dfs://") ||
426  boost::algorithm::starts_with(state.scan.outfile, "fs://")) {
427  // init Fs client if not done yet
428  if (!fs_client)
429  fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager, Config::properties);
430  if (boost::algorithm::starts_with(state.scan.outfile, "dfs://"))
431  fout.push(FsBroker::Lib::FileSink(fs_client, state.scan.outfile.substr(6)));
432  else
433  fout.push(FsBroker::Lib::FileSink(fs_client, state.scan.outfile.substr(5)));
434  }
435  else if (boost::algorithm::starts_with(state.scan.outfile, localfs))
436  fout.push(boost::iostreams::file_descriptor_sink(state.scan.outfile.substr(localfs.size())));
437  else
438  fout.push(boost::iostreams::file_descriptor_sink(state.scan.outfile));
439 
440  if (state.scan.display_timestamps)
441  fout << "#timestamp" << fs << "row" << fs << "column" << fs << "value\n";
442  else
443  fout << "#row" << fs << "column" << fs << "value\n";
444  }
445  else if (!outf) {
446  cb.on_dump(*dumper.get());
447  return 0;
448  }
449  else {
450  out_fd = dup(fileno(outf));
451  fout.push(boost::iostreams::file_descriptor_sink(out_fd));
452  }
453 
454  HT_ON_SCOPE_EXIT(&close_file, out_fd);
455  Cell cell;
456  LoadDataEscape row_escaper;
457  LoadDataEscape escaper;
458  const char *unescaped_buf, *row_unescaped_buf;
459  size_t unescaped_len, row_unescaped_len;
460 
461  if (fs != '\t') {
462  row_escaper.set_field_separator(fs);
463  escaper.set_field_separator(fs);
464  }
465 
466  while (dumper->next(cell)) {
467  if (cb.normal_mode) {
468  // do some stats
469  ++cb.total_cells;
470  cb.total_keys_size += strlen(cell.row_key);
471 
472  if (cell.column_family && cell.column_qualifier)
473  cb.total_keys_size += strlen(cell.column_qualifier) + 1;
474 
475  cb.total_values_size += cell.value_len;
476  }
477 
478  if (state.scan.display_timestamps)
479  fout << cell.timestamp << fs;
480 
481  if (state.escape)
482  row_escaper.escape(cell.row_key, strlen(cell.row_key),
483  &row_unescaped_buf, &row_unescaped_len);
484  else
485  row_unescaped_buf = cell.row_key;
486 
487  if (cell.column_family) {
488  fout << row_unescaped_buf << fs << cell.column_family;
489  if (cell.column_qualifier && *cell.column_qualifier) {
490  if (state.escape)
491  escaper.escape(cell.column_qualifier, strlen(cell.column_qualifier),
492  &unescaped_buf, &unescaped_len);
493  else
494  unescaped_buf = cell.column_qualifier;
495  fout << ":" << unescaped_buf;
496  }
497  }
498  else
499  fout << row_unescaped_buf;
500 
501  if (state.escape)
502  escaper.escape((const char *)cell.value, (size_t)cell.value_len,
503  &unescaped_buf, &unescaped_len);
504  else {
505  unescaped_buf = (const char *)cell.value;
506  unescaped_len = (size_t)cell.value_len;
507  }
508 
509  HT_ASSERT(cell.flag == FLAG_INSERT);
510 
511  fout << fs ;
512  fout.write(unescaped_buf, unescaped_len);
513  fout << "\n";
514  }
515 
516  fout.strict_sync();
517 
518  cb.on_finish();
519  return 0;
520 }
521 
522 int
523 cmd_load_data(NamespacePtr &ns, ::uint32_t mutator_flags,
524  ConnectionManagerPtr &conn_manager,
525  FsBroker::Lib::ClientPtr &fs_client,
527  if (!ns)
528  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
529  TablePtr table;
530  TableMutatorPtr mutator;
531  bool into_table = true;
532  bool display_timestamps = false;
533  boost::iostreams::filtering_ostream fout;
534  FILE *outf = cb.output;
535  int out_fd = -1;
536  bool largefile_mode = false;
537  ::uint64_t running_total = 0;
538  ::uint64_t consume_threshold = 0;
539  bool ignore_unknown_columns = false;
540  char fs = state.field_separator ? state.field_separator : '\t';
541 
543  ignore_unknown_columns = true;
544 
545  // Turn on no-log-sync unconditionally for LOAD DATA INFILE
547  mutator_flags |= Table::MUTATOR_FLAG_NO_LOG;
548  else
549  mutator_flags |= Table::MUTATOR_FLAG_NO_LOG_SYNC;
550 
551  if (state.table_name.empty()) {
552  if (state.output_file.empty())
554  "LOAD DATA INFILE ... INTO FILE - bad filename");
555  fout.push(boost::iostreams::file_descriptor_sink(state.output_file));
556  into_table = false;
557  }
558  else {
559  if (outf) {
560  out_fd = dup(fileno(outf));
561  fout.push(boost::iostreams::file_descriptor_sink(out_fd));
562  }
563  else
564  fout.push(boost::iostreams::null_sink());
565  table = ns->open_table(state.table_name);
566  mutator.reset(table->create_mutator(0, mutator_flags));
567  }
568 
569  HT_ON_SCOPE_EXIT(&close_file, out_fd);
570 
571 
572  bool is_delete;
573 
574  // init Fs client if not done yet
575  if(state.input_file_src == DFS_FILE && !fs_client)
576  fs_client = std::make_shared<FsBroker::Lib::Client>(conn_manager, Config::properties);
577 
578  LoadDataSourcePtr lds(LoadDataSourceFactory::create(fs_client, state.input_file,
579  state.input_file_src, state.header_file, state.header_file_src,
580  state.columns, state.timestamp_column, fs,
581  state.row_uniquify_chars, state.load_flags));
582 
583  cb.file_size = lds->get_source_size();
584  if (cb.file_size > std::numeric_limits<unsigned long>::max()) {
585  largefile_mode = true;
586  unsigned long adjusted_size = (unsigned long)(cb.file_size / 1048576LL);
587  consume_threshold = 1048576LL;
588  cb.on_update(adjusted_size);
589  }
590  else
591  cb.on_update(cb.file_size);
592 
593  if (!into_table) {
594  display_timestamps = lds->has_timestamps();
595  if (display_timestamps)
596  fout << "timestamp" << fs << "row" << fs << "column" << fs << "value\n";
597  else
598  fout << "row" << fs << "column" << fs << "value\n";
599  }
600 
601  KeySpec key;
602  ::uint8_t *value;
603  ::uint32_t value_len;
604  ::uint32_t consumed = 0;
605  LoadDataEscape row_escaper;
606  LoadDataEscape qualifier_escaper;
607  LoadDataEscape value_escaper;
608  const char *escaped_buf;
609  size_t escaped_len;
610 
611  if (fs != '\t') {
612  row_escaper.set_field_separator(fs);
613  qualifier_escaper.set_field_separator(fs);
614  value_escaper.set_field_separator(fs);
615  }
616 
617  try {
618 
619  while (lds->next(&key, &value, &value_len, &is_delete, &consumed)) {
620 
621  ++cb.total_cells;
622  cb.total_values_size += value_len;
623  cb.total_keys_size += key.row_len;
624 
625  if (state.escape) {
626  row_escaper.unescape((const char *)key.row, (size_t)key.row_len,
627  &escaped_buf, &escaped_len);
628  key.row = escaped_buf;
629  key.row_len = escaped_len;
630  qualifier_escaper.unescape(key.column_qualifier,
631  (size_t)key.column_qualifier_len, &escaped_buf, &escaped_len);
632  key.column_qualifier = escaped_buf;
633  key.column_qualifier_len = escaped_len;
634  value_escaper.unescape((const char *)value,
635  (size_t)value_len, &escaped_buf, &escaped_len);
636  }
637  else {
638  escaped_buf = (const char *)value;
639  escaped_len = (size_t)value_len;
640  }
641 
642  if (into_table) {
643  try {
644  bool skip = false;
645  if (ignore_unknown_columns) {
646  SchemaPtr schema = table->schema();
647  if (!schema->get_column_family(key.column_family))
648  skip = true;
649  }
650  if (!skip) {
651  if (is_delete)
652  mutator->set_delete(key);
653  else
654  mutator->set(key, escaped_buf, escaped_len);
655  }
656  }
657  catch (Exception &e) {
658  do {
659  mutator->show_failed(e);
660  } while (!mutator->retry());
661  }
662  }
663  else {
664  if (display_timestamps)
665  fout << key.timestamp << fs << key.row << fs
666  << key.column_family << fs << escaped_buf << "\n";
667  else
668  fout << key.row << fs << key.column_family << fs
669  << escaped_buf << "\n";
670  }
671 
672  if (cb.normal_mode && state.input_file_src != STDIN) {
673  if (largefile_mode == true) {
674  running_total += consumed;
675  if (running_total >= consume_threshold) {
676  consumed = 1 + (unsigned long)((running_total - consume_threshold)
677  / 1048576LL);
678  consume_threshold += (::uint64_t)consumed * 1048576LL;
679  cb.on_progress(consumed);
680  }
681  }
682  else
683  cb.on_progress(consumed);
684  }
685  }
686  }
687  catch (Exception &e) {
688  HT_THROW2F(e.code(), e, "line number %lld", (Lld)lds->get_current_lineno());
689  }
690 
691  fout.strict_sync();
692 
693  cb.on_finish(mutator);
694  return 0;
695 }
696 
697 int
698 cmd_insert(NamespacePtr &ns, ParserState &state, HqlInterpreter::Callback &cb) {
699  if (!ns)
700  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
701  TablePtr table;
702  const Cells &cells = state.inserts.get();
703 
704  table = ns->open_table(state.table_name);
705  TableMutatorPtr mutator( table->create_mutator() );
706 
707  try {
708  mutator->set_cells(cells);
709  }
710  catch (Exception &e) {
711  do {
712  mutator->show_failed(e);
713  } while (!mutator->retry());
714 
715  if (mutator->get_last_error())
716  HT_THROW(mutator->get_last_error(),
717  Error::get_text(mutator->get_last_error()));
718  }
719  if (cb.normal_mode) {
720  cb.total_cells = cells.size();
721 
722  for (const auto &cell : cells) {
724  ? (strlen(cell.column_qualifier) + 1) : 0;
725  cb.total_values_size += cell.value_len;
726  }
727  }
728 
729  // flush the mutator
730  cb.on_finish(mutator);
731 
732  // report errors during mutator->flush
733  if (mutator->get_last_error())
734  HT_THROW(mutator->get_last_error(),
735  Error::get_text(mutator->get_last_error()));
736  return 0;
737 }
738 
739 int
740 cmd_delete(NamespacePtr &ns, ParserState &state, HqlInterpreter::Callback &cb) {
741  if (!ns)
742  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
743  TablePtr table;
744  KeySpec key;
745  char *column_qualifier;
746 
747  table = ns->open_table(state.table_name);
748  TableMutatorPtr mutator(table->create_mutator());
749 
750  key.row = state.delete_row.c_str();
751  key.row_len = state.delete_row.length();
752 
753  if (state.delete_version_time) {
755  key.timestamp = state.delete_version_time;
756  }
757  else if (state.delete_time) {
758  key.flag = FLAG_DELETE_CELL;
759  key.timestamp = state.delete_time;
760  }
761  else
762  key.timestamp = AUTO_ASSIGN;
763 
764  if (state.delete_all_columns) {
765  try {
766  key.flag = FLAG_DELETE_ROW;
767  mutator->set_delete(key);
768  }
769  catch (Exception &e) {
770  mutator->show_failed(e);
771  return 2;
772  }
773  }
774  else {
775  for (const auto &col : state.delete_columns) {
776  ++cb.total_cells;
777 
778  key.column_family = col.c_str();
779  if ((column_qualifier = (char*)strchr(col.c_str(), ':')) != 0) {
780  *column_qualifier++ = 0;
781  key.column_qualifier = column_qualifier;
782  key.column_qualifier_len = strlen(column_qualifier);
783  if (key.flag == FLAG_INSERT)
784  key.flag = FLAG_DELETE_CELL;
785  }
786  else {
787  key.column_qualifier = 0;
788  key.column_qualifier_len = 0;
789  if (key.flag == FLAG_INSERT)
791  }
792  try {
793  mutator->set_delete(key);
794  }
795  catch (Exception &e) {
796  mutator->show_failed(e);
797  return 2;
798  }
799  }
800  }
801 
802  cb.on_finish(mutator);
803  return 0;
804 }
805 
806 int
807 cmd_get_listing(NamespacePtr &ns, ParserState &state,
809  if (!ns)
810  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
811  std::vector<NamespaceListing> listing;
812  ns->get_listing(false, listing);
813  for (const auto &entry : listing) {
814  if (entry.is_namespace && !state.tables_only)
815  cb.on_return(entry.name + "\t(namespace)");
816  else if (!entry.is_namespace)
817  cb.on_return(entry.name);
818  }
819  cb.on_finish();
820  return 0;
821 }
822 
823 int
824 cmd_drop_table(NamespacePtr &ns, ParserState &state,
826  if (!ns)
827  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
828 
829  ns->drop_table(state.table_name, state.if_exists);
830  cb.on_finish();
831  return 0;
832 }
833 
834 int cmd_balance(Client *client, ParserState &state,
836  Lib::Master::ClientPtr master = client->get_master_client();
837 
838  master->balance(state.balance_plan);
839 
840  cb.on_finish();
841  return 0;
842 }
843 
844 int cmd_stop(Client *client, ParserState &state, HqlInterpreter::Callback &cb) {
845  Lib::Master::ClientPtr master = client->get_master_client();
846  master->stop(state.rs_name);
847  cb.on_finish();
848  return 0;
849 }
850 
851 int cmd_set(Client *client, ParserState &state, HqlInterpreter::Callback &cb) {
852  Lib::Master::ClientPtr master = client->get_master_client();
853  master->set_state(state.variable_specs);
854  cb.on_finish();
855  return 0;
856 }
857 
858 int cmd_status(Client *client, ParserState &state, HqlInterpreter::Callback &cb) {
859  Status status;
860  Timer timer(10000, true);
861  int error {};
862 
863  try {
864  error = client->get_hyperspace_session()->status(status, &timer);
865  }
866  catch (Exception &e) {
867  cb.on_return(Hypertable::format("Hypertable CRITICAL - %s - %s",
868  Error::get_text(e.code()), e.what()));
869  return 2;
870  }
871 
872  if (error != Error::OK) {
873  cb.on_return(Hypertable::format("Hypertable CRITICAL - %s",
874  Error::get_text(error)));
875  return 2;
876  }
877 
878  if (status.get() != Status::Code::OK) {
879  Status::Code code;
880  string text;
881  status.get(&code, text);
882  cb.on_return(Hypertable::format("Hypertable %s - %s",
883  Status::code_to_string(code), text.c_str()));
884  return static_cast<int>(code);
885  }
886 
887  Lib::Master::ClientPtr master = client->get_master_client();
888  master->system_status(status, &timer);
889 
890  if (status.get() != Status::Code::OK) {
891  Status::Code code;
892  string text;
893  status.get(&code, text);
894  cb.on_return(Hypertable::format("Hypertable %s - %s",
895  Status::code_to_string(code), text.c_str()));
896  return static_cast<int>(code);
897  }
898 
899  cb.on_return("Hypertable OK");
900  return 0;
901 }
902 
903 int
904 cmd_rebuild_indices(Client *client, NamespacePtr &ns, ParserState &state,
906  if (!ns)
907  HT_THROW(Error::BAD_NAMESPACE, "Null namespace");
908 
909  if (state.table_name.empty())
910  HT_THROW(Error::HQL_PARSE_ERROR, "Empty table name");
911 
912  NamespacePtr working_ns = ns;
913  string table_basename = state.table_name;
914  size_t lastslash = state.table_name.find_last_of('/');
915  if (lastslash != string::npos) {
916  table_basename = state.table_name.substr(lastslash+1);
917  if (state.table_name[0] == '/') {
918  if (lastslash == 0)
919  working_ns = client->open_namespace("/");
920  else
921  working_ns =
922  client->open_namespace(state.table_name.substr(0, lastslash));
923  }
924  else
925  working_ns = client->open_namespace(state.table_name.substr(0, lastslash),
926  ns.get());
927  }
928 
929  int8_t parts = state.flags ? static_cast<int8_t>(state.flags) : TableParts::ALL;
930  TableParts table_parts(parts);
931 
932  working_ns->rebuild_indices(table_basename, table_parts);
933 
934  cb.on_finish();
935  return 0;
936 }
937 
938 
939 int cmd_shutdown_master(Client *client, HqlInterpreter::Callback &cb) {
940  client->shutdown();
941  cb.on_finish();
942  return 0;
943 }
944 
945 int cmd_close(Client *client, HqlInterpreter::Callback &cb) {
946  client->close();
947  cb.on_finish();
948  return 0;
949 }
950 
951 } // local namespace
952 
953 
954 HqlInterpreter::HqlInterpreter(Client *client, ConnectionManagerPtr &conn_manager,
955  bool immutable_namespace) : m_client(client), m_mutator_flags(0),
956  m_conn_manager(conn_manager), m_fs_client(0), m_immutable_namespace(immutable_namespace) {
957  if (Config::properties->get_bool("Hypertable.HqlInterpreter.Mutator.NoLogSync"))
959 
960 }
961 
962 void HqlInterpreter::set_namespace(const string &ns) {
964  HT_THROW(Error::BAD_NAMESPACE, (String)"Attempting to modify immutable namespace " +
965  m_namespace->get_name() + " to " + ns);
967 }
968 
969 int HqlInterpreter::execute(const string &line, Callback &cb) {
970  ParserState state(m_namespace);
971  string stripped_line = line;
972 
973  boost::trim(stripped_line);
974 
975  Hql::Parser parser(state);
976  parse_info<> info = parse(stripped_line.c_str(), parser, space_p);
977 
978  if (info.full) {
979  cb.on_parsed(state);
980 
981  switch (state.command) {
983  return cmd_show_create_table(m_namespace, state, cb);
984  case COMMAND_HELP:
985  return cmd_help(state, cb);
987  return cmd_exists_table(m_namespace, state, cb);
989  return cmd_create_table(m_namespace, state, cb);
991  return cmd_describe_table(m_namespace, state, cb);
992  case COMMAND_SELECT:
993  return cmd_select(m_namespace, m_conn_manager, m_fs_client,
994  state, cb);
995  case COMMAND_LOAD_DATA:
996  return cmd_load_data(m_namespace, m_mutator_flags,
997  m_conn_manager, m_fs_client, state, cb);
998  case COMMAND_INSERT:
999  return cmd_insert(m_namespace, state, cb);
1000  case COMMAND_DELETE:
1001  return cmd_delete(m_namespace, state, cb);
1002  case COMMAND_GET_LISTING:
1003  return cmd_get_listing(m_namespace, state, cb);
1004  case COMMAND_ALTER_TABLE:
1005  return cmd_alter_table(m_namespace, state, cb);
1006  case COMMAND_COMPACT:
1007  return cmd_compact(m_namespace, state, cb);
1008  case COMMAND_DROP_TABLE:
1009  return cmd_drop_table(m_namespace, state, cb);
1010  case COMMAND_RENAME_TABLE:
1011  return cmd_rename_table(m_namespace, state, cb);
1012  case COMMAND_DUMP_TABLE:
1013  return cmd_dump_table(m_namespace, m_conn_manager, m_fs_client,
1014  state, cb);
1015  case COMMAND_CLOSE:
1016  return cmd_close(m_client, cb);
1018  return cmd_shutdown_master(m_client, cb);
1020  return cmd_create_namespace(m_client, m_namespace, state, cb);
1021  case COMMAND_USE_NAMESPACE:
1022  return cmd_use_namespace(m_client, m_namespace,
1023  m_immutable_namespace, state, cb);
1025  return cmd_drop_namespace(m_client, m_namespace, state, cb);
1026  case COMMAND_BALANCE:
1027  return cmd_balance(m_client, state, cb);
1028  case COMMAND_STOP:
1029  return cmd_stop(m_client, state, cb);
1030  case COMMAND_SET:
1031  return cmd_set(m_client, state, cb);
1032  case COMMAND_STATUS:
1033  return cmd_status(m_client, state, cb);
1035  return cmd_rebuild_indices(m_client, m_namespace, state, cb);
1036 
1037  default:
1038  HT_THROW(Error::HQL_PARSE_ERROR, String("unsupported command: ") + stripped_line);
1039  }
1040  }
1041  else
1042  HT_THROW(Error::HQL_PARSE_ERROR, String("parse error at: ") + info.stop + " (" + stripped_line + ")");
1043  return 0;
1044 }
int64_t timestamp
Definition: KeySpec.h:130
#define HT_THROW2F(_code_, _ex_, _fmt_,...)
Definition: Error.h:494
void drop_namespace(const std::string &name, Namespace *base=NULL, bool if_exists=false)
Removes a namespace.
Definition: Client.cc:150
virtual void on_finish(TableMutatorPtr &mutator)
Called when interpreter is finished Note: mutator pointer maybe NULL in case of things like LOAD DATA...
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
NamespacePtr open_namespace(const std::string &name, Namespace *base=NULL)
Opens a Namespace.
Definition: Client.cc:106
Holds Nagios-style program status information.
Definition: Status.h:42
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
ConnectionManagerPtr m_conn_manager
Declarations for Status.
The Stopwatch measures elapsed time.
void set_field_separator(char fs)
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
static const uint32_t FLAG_INSERT
Definition: KeySpec.h:47
bool ignore_unknown_cfs(int flags)
Definition: LoadDataFlags.h:43
const char * column_qualifier
Definition: KeySpec.h:128
const char * column_qualifier
Definition: Cell.h:68
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
STL namespace.
bool escape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
#define HT_ON_SCOPE_EXIT(...)
Definition: ScopeGuard.h:301
size_t column_qualifier_len
Definition: KeySpec.h:129
const void * row
Definition: KeySpec.h:125
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
Definition: TableScanner.h:124
Represents a set of table parts (sub-tables).
Definition: TableParts.h:47
Code
Enumeration for status codes.
Definition: Status.h:47
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
Declarations for Schema.
virtual void on_update(size_t total)
Called when interpreter is ready to update.
void get(Cells &cells)
Definition: Cells.h:97
ScanSpecBuilder builder
Definition: HqlParser.h:266
virtual void on_parsed(Hql::ParserState &)
Called when the hql string is parsed successfully.
bool no_log(int flags)
Definition: LoadDataFlags.h:51
std::shared_ptr< Namespace > NamespacePtr
Shared smart pointer to Namespace.
Definition: Namespace.h:333
#define HT_ASSERT(_e_)
Definition: Logger.h:396
std::shared_ptr< TableMutator > TableMutatorPtr
Smart pointer to TableMutator.
Definition: TableMutator.h:257
virtual void on_progress(size_t amount)
Called when interpreter updates progress for long running queries.
File system utility functions.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
std::vector< String > columns
Definition: HqlParser.h:313
uint64_t revision
Definition: Cell.h:70
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:233
Compatibility Macros for C/C++.
const char * row_key
Definition: Cell.h:66
T get(const String &name)
Retrieves a configuration value.
Definition: Config.h:82
virtual void on_scan(TableScannerPtr &)
Called when interpreter is ready to scan.
virtual void on_dump(TableDumper &)
Called when interpreter is ready to dump.
Hypertable definitions
void set_namespace(const std::string &ns)
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Callback interface/base class for execute.
Hyperspace::SessionPtr & get_hyperspace_session()
Definition: Client.cc:171
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:201
virtual void on_return(const std::string &)
Called when interpreter returns a string result Maybe called multiple times for a list of string resu...
const char * column_family
Definition: Cell.h:67
void create_namespace(const std::string &name, Namespace *base=NULL, bool create_intermediate=false, bool if_not_exists=false)
Creates a namespace.
Definition: Client.cc:86
void get(Code *code, std::string &text) const
Gets status code and text.
Definition: Status.h:111
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
This is a generic exception class for Hypertable.
Definition: Error.h:314
A String class based on std::string.
std::vector< String > delete_columns
Definition: HqlParser.h:339
uint32_t value_len
Definition: Cell.h:72
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
static const int64_t AUTO_ASSIGN
Definition: KeySpec.h:38
uint8_t flag
Definition: Cell.h:73
Configuration settings.
std::shared_ptr< TableDumper > TableDumperPtr
Smart pointer to TableDumper.
Definition: TableDumper.h:68
Declarations for HqlHelpText.
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
FsBroker::Lib::ClientPtr m_fs_client
Encapsulates decomposed key and value.
Definition: Cell.h:32
bool unescape(const char *in_buf, size_t in_len, const char **out_bufp, size_t *out_lenp)
const char * column_family
Definition: KeySpec.h:127
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
static const uint32_t FLAG_DELETE_CELL_VERSION
Definition: KeySpec.h:43
int execute(const std::string &str, Callback &)
The main interface for the interpreter.
std::shared_ptr< LoadDataSource > LoadDataSourcePtr
Smart pointer to LoadDataSource.
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
const uint8_t * value
Definition: Cell.h:71
Lib::Master::ClientPtr get_master_client()
Definition: Client.cc:176
const char * code_to_string(int var_code)
Converts variable code to variable string.
int code() const
Returns the error code.
Definition: Error.h:391
std::vector< SystemVariable::Spec > variable_specs
Definition: HqlParser.h:345
Executes user-defined functions when leaving the current scope.
int64_t timestamp
Definition: Cell.h:69