0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ht_load_generator.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "LoadClient.h"
25 #include "LoadThread.h"
26 #include "QueryThread.h"
27 #include "ParallelLoad.h"
28 
29 #include <Hypertable/Lib/Client.h>
31 #include <Hypertable/Lib/Config.h>
32 #include <Hypertable/Lib/Cells.h>
33 
34 #include <Common/Stopwatch.h>
35 #include <Common/String.h>
36 #include <Common/Init.h>
37 #include <Common/Usage.h>
38 
39 #include <boost/algorithm/string.hpp>
40 #include <boost/progress.hpp>
41 #include <boost/shared_array.hpp>
42 #include <boost/thread/thread.hpp>
43 
44 #include <chrono>
45 #include <cmath>
46 #include <cstdio>
47 #include <cstdlib>
48 #include <fstream>
49 #include <iostream>
50 #include <thread>
51 
52 using namespace Hypertable;
53 using namespace Hypertable::Config;
54 using namespace std;
55 
56 namespace {
57 
58  const char *usage =
59  "\n"
60  "Usage: ht_generate_load [options] <type>\n\n"
61  "Description:\n"
62  " This program is used to generate load on a Hypertable\n"
63  " cluster. The <type> argument indicates the type of load\n"
64  " to generate ('query' or 'update').\n\n"
65  "Options";
66 
67  struct AppPolicy : Config::Policy {
68  static void init_options() {
70  cmdline_desc(usage).add_options()
71  ("help-config", "Show help message for config properties")
72  ("table", str()->default_value("LoadTest"), "Name of table to query/update")
73  ("delete-percentage", i32(),
74  "When generating update workload make this percentage deletes")
75  ("max-bytes", i64(), "Amount of data to generate, measured by number "
76  "of key and value bytes produced")
77  ("max-keys", i64(), "Maximum number of keys to generate for query load")
78  ("parallel", i32()->default_value(0),
79  "Spawn threads to execute requests in parallel")
80  ("query-delay", i32(), "Delay milliseconds between each query")
81  ("query-mode", str(),
82  "Whether to query 'index' or 'qualifier' index")
83  ("sample-file", str(),
84  "Output file to hold request latencies, one per line")
85  ("seed", i32()->default_value(1), "Pseudo-random number generator seed")
86  ("row-seed", i32()->default_value(1), "Pseudo-random number generator seed")
87  ("spec-file", str(),
88  "File containing the DataGenerator specification")
89  ("stdout", boo()->zero_tokens()->default_value(false),
90  "Display generated data to stdout instead of sending load to cluster")
91  ("verbose,v", boo()->zero_tokens()->default_value(false),
92  "Show more verbose output")
93  ("flush", boo()->zero_tokens()->default_value(false), "Flush after each update")
94  ("no-log-sync", boo()->zero_tokens()->default_value(false), "Don't sync rangeserver commit logs on autoflush")
95  ("no-log", "Don't write to the commit log")
96  ("flush-interval", i64()->default_value(0),
97  "Amount of data after which to mutator buffers are flushed "
98  "and commit log is synced. Only used if no-log-sync flag is on")
99  ("shared-mutator-flush-interval", i64()->default_value(0),
100  "Created a shared mutator using this value as the flush interval")
101  ("thrift", boo()->zero_tokens()->default_value(false),
102  "Generate load via Thrift interface instead of C++ client library")
103  ("version", "Show version information and exit")
104  ("overwrite-delete-flag", str(), "Force delete flag (DELETE_ROW, DELETE_CELL, DELETE_COLUMN_FAMILY)")
105  ;
106  alias("delete-percentage", "DataGenerator.DeletePercentage");
107  alias("max-bytes", "DataGenerator.MaxBytes");
108  alias("max-keys", "DataGenerator.MaxKeys");
109  alias("seed", "DataGenerator.Seed");
110  alias("row-seed", "rowkey.seed");
111  cmdline_hidden_desc().add_options()
112  ("type", str(), "Type (update or query).");
113  cmdline_positional_desc().add("type", 1);
114  }
115  };
116 }
117 
118 
119 typedef Meta::list<AppPolicy, DataGeneratorPolicy, DefaultCommPolicy> Policies;
120 
121 void
122 generate_update_load(PropertiesPtr &props, String &tablename, bool flush,
123  ::uint32_t mutator_flags, ::uint64_t flush_interval,
124  ::uint64_t shared_mutator_flush_interval, bool to_stdout,
125  String &sample_fname, ::int32_t delete_pct, bool thrift);
126 
127 void
129  ::int32_t parallel, bool flush, ::uint32_t mutator_flags,
130  ::uint64_t flush_interval,
131  ::uint64_t shared_mutator_flush_interval,
132  ::int32_t delete_pct, bool thrift);
133 
134 void generate_query_load(PropertiesPtr &props, String &tablename,
135  bool to_stdout, ::int32_t delay, String &sample_fname, bool thrift);
136 
137 void generate_query_load_parallel(PropertiesPtr &props, String &tablename,
138  int32_t parallel);
139 
140 double std_dev(::uint64_t nn, double sum, double sq_sum);
141 
142 void parse_command_line(int argc, char **argv, PropertiesPtr &props);
143 
144 
145 int main(int argc, char **argv) {
146  String table, load_type, spec_file, sample_fname;
147  PropertiesPtr generator_props = make_shared<Properties>();
148  bool flush, to_stdout, thrift;
149  ::uint64_t flush_interval=0;
150  ::uint64_t shared_mutator_flush_interval=0;
151  ::int32_t query_delay = 0;
152  ::int32_t delete_pct = 0;
153  ::int32_t parallel = 0;
154  ::uint32_t mutator_flags = 0;
155 
156  try {
157  init_with_policies<Policies>(argc, argv);
158 
159  if (!has("type")) {
160  std::cout << cmdline_desc() << std::flush;
161  quick_exit(EXIT_SUCCESS);
162  }
163 
164  load_type = get_str("type");
165 
166  table = get_str("table");
167 
168  parallel = get_i32("parallel");
169 
170  sample_fname = has("sample-file") ? get_str("sample-file") : "";
171 
172  if (has("query-delay"))
173  query_delay = get_i32("query-delay");
174 
175  flush = get_bool("flush");
176  if (has("no-log"))
177  mutator_flags |= TableMutator::FLAG_NO_LOG;
178  else if (get_bool("no-log-sync"))
179  mutator_flags |= TableMutator::FLAG_NO_LOG_SYNC;
180  to_stdout = get_bool("stdout");
181  if (mutator_flags & TableMutator::FLAG_NO_LOG_SYNC)
182  flush_interval = get_i64("flush-interval");
183  shared_mutator_flush_interval = get_i64("shared-mutator-flush-interval");
184  thrift = get_bool("thrift");
185 
186  if (has("spec-file")) {
187  spec_file = get_str("spec-file");
188  if (FileUtils::exists(spec_file))
189  generator_props->load(spec_file, cmdline_hidden_desc(), true);
190  else
191  HT_THROW(Error::FILE_NOT_FOUND, spec_file);
192  }
193 
194  parse_command_line(argc, argv, generator_props);
195 
196  if (generator_props->has("DataGenerator.MaxBytes") &&
197  generator_props->has("DataGenerator.MaxKeys")) {
198  HT_ERROR("Only one of 'DataGenerator.MaxBytes' or 'DataGenerator.MaxKeys' may be specified");
199  quick_exit(EXIT_FAILURE);
200  }
201 
202  if (generator_props->has("DataGenerator.DeletePercentage"))
203  delete_pct = generator_props->get_i32("DataGenerator.DeletePercentage");
204 
205  if (load_type == "update" && parallel > 0)
206  generate_update_load_parallel(generator_props, table, parallel, flush,
207  mutator_flags, flush_interval,
208  shared_mutator_flush_interval, delete_pct,
209  thrift);
210  else if (load_type == "update")
211  generate_update_load(generator_props, table, flush, mutator_flags,
212  flush_interval, shared_mutator_flush_interval,
213  to_stdout, sample_fname, delete_pct, thrift);
214  else if (load_type == "query") {
215  if (!generator_props->has("DataGenerator.MaxKeys")
216  && !generator_props->has("max-keys")) {
217  HT_ERROR("'DataGenerator.MaxKeys' or --max-keys must be specified for "
218  "load type 'query'");
219  quick_exit(EXIT_FAILURE);
220  }
221  if (parallel > 0) {
222  if (to_stdout) {
223  HT_FATAL("--stdout switch not supported for parallel queries");
224  quick_exit(EXIT_FAILURE);
225  }
226  if (sample_fname != "") {
227  HT_FATAL("--sample-file not supported for parallel queries");
228  quick_exit(EXIT_FAILURE);
229  }
230  if (has("query-mode")) {
231  HT_FATAL("--query-mode not supported for parallel queries");
232  quick_exit(EXIT_FAILURE);
233  }
234  if (thrift) {
235  HT_FATAL("thrift mode not supported for parallel queries");
236  quick_exit(EXIT_FAILURE);
237  }
238 
239  generate_query_load_parallel(generator_props, table, parallel);
240  }
241  else
242  generate_query_load(generator_props, table, to_stdout, query_delay,
243  sample_fname, thrift);
244  }
245  else {
246  std::cout << cmdline_desc() << std::flush;
247  quick_exit(EXIT_FAILURE);
248  }
249  }
250  catch (Exception &e) {
251  HT_ERROR_OUT << e << HT_END;
252  exit(EXIT_FAILURE);
253  }
254 
255  fflush(stdout);
256  quick_exit(EXIT_SUCCESS); // don't bother with static objects
257 }
258 
259 
260 void parse_command_line(int argc, char **argv, PropertiesPtr &props) {
261  const char *ptr;
262  String key, value;
263  props->parse_args(argc, argv, cmdline_desc(), 0, 0, true);
264  for (int i=1; i<argc; i++) {
265  if (argv[i][0] == '-') {
266  ptr = strchr(argv[i], '=');
267  if (ptr) {
268  key = String(argv[i], ptr-argv[i]);
269  trim_if(key, is_any_of("-"));
270  value = String(ptr+1);
271  trim_if(value, is_any_of("'\""));
272  if (key == "delete-percentage") {
273  props->set(key, boost::any( atoi(value.c_str()) ));
274  props->set("DataGenerator.DeletePercentage", boost::any( atoi(value.c_str()) ));
275  }
276  else if (key == ("max-bytes")) {
277  props->set(key, boost::any( strtoll(value.c_str(), 0, 0) ));
278  props->set("DataGenerator.MaxBytes", boost::any( strtoll(value.c_str(), 0, 0) ));
279  }
280  else if (key == ("max-keys")) {
281  props->set(key, boost::any( strtoll(value.c_str(), 0, 0) ));
282  props->set("DataGenerator.MaxKeys", boost::any( strtoll(value.c_str(), 0, 0) ));
283  }
284  else if (key == "seed") {
285  props->set(key, boost::any( atoi(value.c_str()) ));
286  props->set("DataGenerator.Seed", boost::any( atoi(value.c_str()) ));
287  }
288  else if (key == "row-seed") {
289  props->set(key, boost::any( atoi(value.c_str()) ));
290  props->set("rowkey.seed", boost::any( atoi(value.c_str()) ));
291  }
292  else
293  props->set(key, boost::any(value));
294  }
295  else {
296  key = String(argv[i]);
297  trim_if(key, is_any_of("-"));
298  if (!props->has(key))
299  props->set(key, boost::any( true ));
300  }
301  }
302  }
303 }
304 
305 
306 void
307 generate_update_load(PropertiesPtr &props, String &tablename, bool flush,
308  ::uint32_t mutator_flags, ::uint64_t flush_interval,
309  ::uint64_t shared_mutator_flush_interval, bool to_stdout,
310  String &sample_fname, ::int32_t delete_pct, bool thrift)
311 {
312  double cum_latency=0, cum_sq_latency=0, latency=0;
313  double min_latency=10000000, max_latency=0;
314  ::uint64_t total_cells=0;
315  Cells cells;
316  clock_t start_clocks=0, stop_clocks=0;
317  double clocks_per_usec = (double)CLOCKS_PER_SEC / 1000000.0;
318  bool output_samples = false;
319  ofstream sample_file;
320  DataGenerator dg(props);
321  ::uint64_t unflushed_data=0;
322  ::uint64_t total_bytes = 0;
323  ::uint64_t consume_threshold = 0;
324 
325  if (to_stdout) {
326  cout << "#row\tcolumn\tvalue\n";
327  for (DataGenerator::iterator iter = dg.begin(); iter != dg.end(); iter++) {
328  if ((*iter).column_qualifier == 0 || *(*iter).column_qualifier == 0) {
329  if (delete_pct != 0 && (::random() % 100) < delete_pct)
330  cout << (*iter).row_key << "\t" << (*iter).column_family << "\t\tDELETE_CELL\n";
331  else
332  cout << (*iter).row_key << "\t" << (*iter).column_family
333  << "\t" << (const char *)(*iter).value << "\n";
334  }
335  else {
336  if (delete_pct != 0 && (::random() % 100) < delete_pct)
337  cout << (*iter).row_key << "\t" << (*iter).column_family << ":"
338  << (*iter).column_qualifier << "\t\tDELETE_CELL\n";
339  else
340  cout << (*iter).row_key << "\t" << (*iter).column_family << ":"
341  << (*iter).column_qualifier << "\t" << (const char *)(*iter).value << "\n";
342  }
343  }
344  cout << std::flush;
345  return;
346  }
347 
348  if (sample_fname != "") {
349  sample_file.open(sample_fname.c_str());
350  output_samples = true;
351  }
352 
353  Stopwatch stopwatch;
354 
355  try {
356  LoadClientPtr load_client_ptr;
357  String config_file = get_str("config");
358  bool key_limit = props->has("DataGenerator.MaxKeys");
359  bool largefile_mode = false;
360  ::uint32_t adjusted_bytes = 0;
361 
362  if (dg.get_max_bytes() > std::numeric_limits< ::uint32_t >::max()) {
363  largefile_mode = true;
364  adjusted_bytes = (uint32_t)(dg.get_max_bytes() / 1048576LL);
365  consume_threshold = 1048576LL;
366  }
367  else
368  adjusted_bytes = dg.get_max_bytes();
369 
370  boost::progress_display progress_meter(key_limit ? dg.get_max_keys() : adjusted_bytes);
371 
372  if (config_file != "")
373  load_client_ptr = make_shared<LoadClient>(config_file, thrift);
374  else
375  load_client_ptr = make_shared<LoadClient>(thrift);
376 
377  load_client_ptr->create_mutator(tablename, mutator_flags,
378  shared_mutator_flush_interval);
379 
380  unsigned delete_flag = (unsigned)-1;
381  if (has("overwrite-delete-flag")) {
382  String delete_flag_str = get_str("overwrite-delete-flag");
383  if (delete_flag_str == "DELETE_ROW")
384  delete_flag = FLAG_DELETE_ROW;
385  else if (delete_flag_str == "DELETE_CELL")
386  delete_flag = FLAG_DELETE_CELL;
387  else if (delete_flag_str == "DELETE_COLUMN_FAMILY")
388  delete_flag = FLAG_DELETE_COLUMN_FAMILY;
389  else if (delete_flag_str != "")
390  HT_FATAL("unknown delete flag");
391  }
392 
393  for (DataGenerator::iterator iter = dg.begin(); iter != dg.end(); total_bytes+=iter.last_data_size(),++iter) {
394 
395  if (delete_pct != 0 && (::random() % 100) < delete_pct) {
396  KeySpec key;
397  key.flag = FLAG_DELETE_ROW;
398  key.row = (*iter).row_key;
399  key.row_len = strlen((const char *)key.row);
400  key.column_family = (*iter).column_family;
401  if (key.column_family != 0)
403  key.column_qualifier = (*iter).column_qualifier;
404  if (key.column_qualifier != 0) {
405  key.column_qualifier_len = strlen(key.column_qualifier);
406  if (key.column_qualifier_len != 0)
407  key.flag = FLAG_DELETE_CELL;
408  }
409  key.timestamp = (*iter).timestamp;
410  key.revision = (*iter).revision;
411 
412  if (delete_flag == FLAG_DELETE_ROW) {
413  key.column_family = 0;
414  key.column_qualifier = 0;
415  key.column_qualifier_len = 0;
416  key.flag = delete_flag;
417  }
418  else if (delete_flag == FLAG_DELETE_COLUMN_FAMILY) {
419  key.flag = delete_flag;
420  }
421  else if (delete_flag == FLAG_DELETE_CELL) {
422  key.flag = delete_flag;
423  }
424 
425  if (flush)
426  start_clocks = clock();
427  load_client_ptr->set_delete(key);
428  }
429  else {
430  // do update
431  cells.clear();
432  cells.push_back(*iter);
433  if (flush)
434  start_clocks = clock();
435  load_client_ptr->set_cells(cells);
436  }
437 
438  if (flush) {
439  load_client_ptr->flush();
440  stop_clocks = clock();
441  if (stop_clocks < start_clocks)
442  latency = ((std::numeric_limits<clock_t>::max() - start_clocks) + stop_clocks) / clocks_per_usec;
443  else
444  latency = (stop_clocks-start_clocks) / clocks_per_usec;
445  if (output_samples)
446  sample_file << (unsigned long)latency << "\n";
447  else {
448  cum_latency += latency;
449  cum_sq_latency += pow(latency,2);
450  if (latency < min_latency)
451  min_latency = latency;
452  if (latency > max_latency)
453  max_latency = latency;
454  }
455  }
456  else if (flush_interval>0) {
457  // if flush interval was specified then keep track of how much data is currently
458  // not flushed and call flush once the flush interval limit is reached
459  unflushed_data += iter.last_data_size();
460  if (unflushed_data > flush_interval) {
461  load_client_ptr->flush();
462  unflushed_data = 0;
463  }
464 
465  }
466 
467  ++total_cells;
468  if (key_limit)
469  progress_meter += 1;
470  else {
471  if (largefile_mode == true) {
472  if (total_bytes >= consume_threshold) {
473  uint32_t consumed = 1 + (uint32_t)((total_bytes - consume_threshold) / 1048576LL);
474  progress_meter += consumed;
475  consume_threshold += (::uint64_t)consumed * 1048576LL;
476  }
477  }
478  else
479  progress_meter += iter.last_data_size();
480  }
481  }
482 
483  load_client_ptr->flush();
484 
485  }
486  catch (Exception &e) {
487  HT_ERROR_OUT << e << HT_END;
488  exit(EXIT_FAILURE);
489  }
490 
491 
492  stopwatch.stop();
493 
494  printf("\n");
495  printf("\n");
496  printf(" Elapsed time: %.2f s\n", stopwatch.elapsed());
497  printf("Total cells inserted: %llu\n", (Llu) total_cells);
498  printf("Throughput (cells/s): %.2f\n", (double)total_cells/stopwatch.elapsed());
499  printf("Total bytes inserted: %llu\n", (Llu)total_bytes);
500  printf("Throughput (bytes/s): %.2f\n", (double)total_bytes/stopwatch.elapsed());
501 
502  if (flush && !output_samples) {
503  printf(" Latency min (usec): %llu\n", (Llu)min_latency);
504  printf(" Latency max (usec): %llu\n", (Llu)max_latency);
505  printf(" Latency avg (usec): %llu\n", (Llu)((double)cum_latency/total_cells));
506  printf("Latency stddev (usec): %llu\n", (Llu)std_dev(total_cells, cum_latency, cum_sq_latency));
507  }
508  printf("\n");
509  fflush(stdout);
510 
511  if (output_samples)
512  sample_file.close();
513 }
514 
515 void
517  ::int32_t parallel, bool flush, ::uint32_t mutator_flags,
518  ::uint64_t flush_interval,
519  ::uint64_t shared_mutator_flush_interval,
520  ::int32_t delete_pct, bool thrift)
521 {
522  double cum_latency=0, cum_sq_latency=0;
523  double min_latency=0, max_latency=0;
524  ::uint64_t total_cells=0;
525  ::uint64_t total_bytes=0;
526  Cells cells;
527  ofstream sample_file;
528  DataGenerator dg(props);
529  std::vector<ParallelStateRec> load_vector(parallel);
530  ::uint32_t next = 0;
531  ::uint64_t consume_threshold = 0;
532  ::uint64_t consume_total = 0;
533  boost::thread_group threads;
534 
535  Stopwatch stopwatch;
536 
537  try {
538  ClientPtr client;
539  NamespacePtr ht_namespace;
540  TablePtr table;
541  LoadClientPtr load_client_ptr;
542  String config_file = get_str("config");
543  bool key_limit = props->has("DataGenerator.MaxKeys");
544  bool largefile_mode = false;
545  uint32_t adjusted_bytes = 0;
546  LoadRec *lrec;
547 
548  client = make_shared<Hypertable::Client>(config_file);
549  ht_namespace = client->open_namespace("/");
550  table = ht_namespace->open_table(tablename);
551 
552  for (::int32_t i=0; i<parallel; i++)
553  threads.create_thread(LoadThread(table, mutator_flags,
554  shared_mutator_flush_interval,
555  load_vector[i]));
556 
557  if (dg.get_max_bytes() > std::numeric_limits<long>::max()) {
558  largefile_mode = true;
559  adjusted_bytes = (uint32_t)(dg.get_max_bytes() / 1048576LL);
560  consume_threshold = 1048576LL;
561  }
562  else
563  adjusted_bytes = dg.get_max_bytes();
564 
565  boost::progress_display progress_meter(key_limit ?
566  dg.get_max_keys() : adjusted_bytes);
567 
568  for (DataGenerator::iterator iter = dg.begin(); iter != dg.end();
569  total_bytes+=iter.last_data_size(), ++iter) {
570  if (delete_pct != 0 && (::random() % 100) < delete_pct)
571  lrec = new LoadRec(*iter, true);
572  else
573  lrec = new LoadRec(*iter);
574  lrec->amount = iter.last_data_size();
575 
576  {
577  std::lock_guard<std::mutex> lock(load_vector[next].mutex);
578  load_vector[next].requests.push_back(lrec);
579  // Delete garbage, update progress meter
580  while (!load_vector[next].garbage.empty()) {
581  LoadRec *garbage = load_vector[next].garbage.front();
582  if (key_limit)
583  progress_meter += 1;
584  else {
585  if (largefile_mode == true) {
586  consume_total += garbage->amount;
587  if (consume_total >= consume_threshold) {
588  uint32_t consumed = 1 + (uint32_t)((consume_total
589  - consume_threshold) / 1048576LL);
590  progress_meter += consumed;
591  consume_threshold += (::uint64_t)consumed * 1048576LL;
592  }
593  }
594  else
595  progress_meter += garbage->amount;
596  }
597  delete garbage;
598  load_vector[next].garbage.pop_front();
599  }
600  load_vector[next].cond.notify_all();
601  }
602  next = (next+1) % parallel;
603 
604  ++total_cells;
605  }
606 
607  for (::int32_t i=0; i<parallel; i++) {
608  std::lock_guard<std::mutex> lock(load_vector[i].mutex);
609  load_vector[i].finished = true;
610  load_vector[i].cond.notify_all();
611  }
612 
613  threads.join_all();
614 
615  min_latency = load_vector[0].min_latency;
616  for (::int32_t i=0; i<parallel; i++) {
617  cum_latency += load_vector[i].cum_latency;
618  cum_sq_latency += load_vector[i].cum_sq_latency;
619  if (load_vector[i].min_latency < min_latency)
620  min_latency = load_vector[i].min_latency;
621  if (load_vector[i].max_latency > max_latency)
622  max_latency = load_vector[i].max_latency;
623  }
624  }
625  catch (Exception &e) {
626  HT_ERROR_OUT << e << HT_END;
627  exit(EXIT_FAILURE);
628  }
629 
630  stopwatch.stop();
631 
632  printf("\n");
633  printf("\n");
634  printf(" Elapsed time: %.2f s\n", stopwatch.elapsed());
635  printf("Total cells inserted: %llu\n", (Llu) total_cells);
636  printf("Throughput (cells/s): %.2f\n", (double)total_cells/stopwatch.elapsed());
637  printf("Total bytes inserted: %llu\n", (Llu)total_bytes);
638  printf("Throughput (bytes/s): %.2f\n", (double)total_bytes/stopwatch.elapsed());
639  if (true) {
640  //if (flush) {
641  printf(" Latency min (usec): %llu\n", (Llu)min_latency);
642  printf(" Latency max (usec): %llu\n", (Llu)max_latency);
643  printf(" Latency avg (usec): %llu\n", (Llu)((double)cum_latency/total_cells));
644  printf("Latency stddev (usec): %llu\n", (Llu)std_dev(total_cells, cum_latency, cum_sq_latency));
645  }
646 
647  printf("\n");
648  fflush(stdout);
649 
650 }
651 
652 enum {
653  DEFAULT = 0,
654  INDEX = 1,
656 };
657 
658 void generate_query_load(PropertiesPtr &props, String &tablename,
659  bool to_stdout, ::int32_t delay, String &sample_fname, bool thrift)
660 {
661  double cum_latency=0, cum_sq_latency=0, latency=0;
662  double min_latency=10000000, max_latency=0;
663  ::int64_t total_cells=0;
664  ::int64_t total_bytes=0;
665  Cells cells;
666  clock_t start_clocks, stop_clocks;
667  double clocks_per_usec = (double)CLOCKS_PER_SEC / 1000000.0;
668  bool output_samples = false;
669  ofstream sample_file;
670 
671  int query_mode = DEFAULT;
672  if (has("query-mode")) {
673  String qm = get_str("query-mode");
674  if (qm == "index")
675  query_mode = INDEX;
676  else if (qm == "qualifier")
677  query_mode = QUALIFIER;
678  else
679  HT_THROW(Error::CONFIG_BAD_VALUE, "invalid query-mode parameter");
680  }
681 
682  DataGenerator dg(props, query_mode ? false : true);
683 
684  if (to_stdout) {
685  for (DataGenerator::iterator iter = dg.begin(); iter != dg.end(); iter++) {
686  if (query_mode == DEFAULT) {
687  if (*(*iter).column_qualifier == 0)
688  cout << (*iter).row_key << "\t" << (*iter).column_family << "\n";
689  else
690  cout << (*iter).row_key << "\t" << (*iter).column_family << ":"
691  << (*iter).column_qualifier << "\n";
692  }
693  else if (query_mode == INDEX) {
694  cout << (*iter).row_key << "\t" << (*iter).column_family;
695  if (*(*iter).column_qualifier != 0)
696  cout << ":" << (*iter).column_qualifier;
697  cout << "\t" << (const char *)(*iter).value << "\n";
698  }
699  else
700  cout << "not implemented!\n";
701  }
702  cout << flush;
703  return;
704  }
705 
706  if (sample_fname != "") {
707  sample_file.open(sample_fname.c_str());
708  output_samples = true;
709  }
710 
711  Stopwatch stopwatch;
712 
713  try {
714  LoadClientPtr load_client_ptr;
715  ScanSpecBuilder scan_spec;
716  Cell cell;
717  String config_file = get_str("config");
718  boost::progress_display progress_meter(dg.get_max_keys());
719  uint64_t last_bytes = 0;
720 
721  if (config_file != "")
722  load_client_ptr = make_shared<LoadClient>(config_file, thrift);
723  else
724  load_client_ptr = make_shared<LoadClient>(thrift);
725 
726  for (DataGenerator::iterator iter = dg.begin(); iter != dg.end(); iter++) {
727 
728  if (delay)
729  std::this_thread::sleep_for(std::chrono::milliseconds(delay));
730 
731  scan_spec.clear();
732  if (query_mode == INDEX) {
733  scan_spec.add_column_predicate((*iter).column_family,
734  (*iter).column_qualifier ? (*iter).column_qualifier : "",
736  (const char *)(*iter).value);
737  }
738  else if (query_mode == QUALIFIER) {
739  scan_spec.add_column_predicate((*iter).column_family,
740  (*iter).column_qualifier ? (*iter).column_qualifier : "",
742  }
743  else {
744  scan_spec.add_column((*iter).column_family);
745  scan_spec.add_row((*iter).row_key);
746  }
747 
748  start_clocks = clock();
749 
750  load_client_ptr->create_scanner(tablename, scan_spec.get());
751  last_bytes = load_client_ptr->get_all_cells();
752  total_bytes += last_bytes;
753  load_client_ptr->close_scanner();
754 
755  stop_clocks = clock();
756  if (stop_clocks < start_clocks)
757  latency = ((std::numeric_limits<clock_t>::max() - start_clocks)
758  + stop_clocks) / clocks_per_usec;
759  else
760  latency = (stop_clocks-start_clocks) / clocks_per_usec;
761  if (output_samples)
762  sample_file << (unsigned long)latency << "\n";
763  else {
764  cum_latency += latency;
765  cum_sq_latency += pow(latency,2);
766  if (latency < min_latency)
767  min_latency = latency;
768  if (latency > max_latency)
769  max_latency = latency;
770  }
771 
772  if (last_bytes)
773  ++total_cells;
774  progress_meter += 1;
775  }
776  }
777  catch (Exception &e) {
778  HT_ERROR_OUT << e << HT_END;
779  exit(EXIT_FAILURE);
780  }
781 
782  stopwatch.stop();
783 
784  printf("\n");
785  printf("\n");
786  printf(" Elapsed time: %.2f s\n", stopwatch.elapsed());
787  printf("Total cells returned: %llu\n", (Llu) total_cells);
788  printf("Throughput (cells/s): %.2f\n", (double)total_cells/stopwatch.elapsed());
789  printf("Total bytes returned: %llu\n", (Llu)total_bytes);
790  printf("Throughput (bytes/s): %.2f\n", (double)total_bytes/stopwatch.elapsed());
791 
792  if (!output_samples) {
793  printf(" Latency min (usec): %llu\n", (Llu)min_latency);
794  printf(" Latency max (usec): %llu\n", (Llu)max_latency);
795  printf(" Latency avg (usec): %llu\n", (Llu)((double)cum_latency/total_cells));
796  printf("Latency stddev (usec): %llu\n", (Llu)std_dev(total_cells, cum_latency, cum_sq_latency));
797  }
798  printf("\n");
799  fflush(stdout);
800 
801  if (output_samples)
802  sample_file.close();
803 }
804 
806  int32_t parallel)
807 {
808  double cum_latency = 0, cum_sq_latency = 0, elapsed_time = 0;
809  double min_latency = 10000000, max_latency = 0;
810  ::int64_t total_cells = 0;
811  ::int64_t total_bytes = 0;
812 
813  int64_t max_keys = props->has("DataGenerator.MaxKeys")
814  ? props->get_i64("DataGenerator.MaxKeys")
815  : props->get_i64("max-keys");
816  boost::progress_display progress(max_keys * parallel);
817 
818  String config_file = get_str("config");
819  ClientPtr client = make_shared<Hypertable::Client>(config_file);
820  NamespacePtr ht_namespace = client->open_namespace("/");
821  TablePtr table = ht_namespace->open_table(tablename);
822 
823  boost::thread_group threads;
824  std::vector<ParallelStateRec> load_vector(parallel);
825  for (::int32_t i = 0; i < parallel; i++)
826  threads.create_thread(QueryThread(props, table, &progress, load_vector[i]));
827 
828  // wait for the threads to finish
829  threads.join_all();
830 
831  // accumulate all the gathered metrics
832  min_latency = load_vector[0].min_latency;
833  for (::int32_t i = 0; i < parallel; i++) {
834  cum_latency += load_vector[i].cum_latency;
835  cum_sq_latency += load_vector[i].cum_sq_latency;
836  total_cells += load_vector[i].total_cells;
837  total_bytes += load_vector[i].total_bytes;
838  elapsed_time = load_vector[i].elapsed_time;
839  if (load_vector[i].min_latency < min_latency)
840  min_latency = load_vector[i].min_latency;
841  if (load_vector[i].max_latency > max_latency)
842  max_latency = load_vector[i].max_latency;
843  }
844 
845  printf("\n");
846  printf("\n");
847  printf(" Elapsed time: %.2f s\n", elapsed_time);
848  printf("Total cells returned: %llu\n", (Llu) total_cells);
849  printf("Throughput (cells/s): %.2f\n", (double)total_cells / elapsed_time);
850  printf("Total bytes returned: %llu\n", (Llu)total_bytes);
851  printf("Throughput (bytes/s): %.2f\n", (double)total_bytes / elapsed_time);
852 
853  printf(" Latency min (usec): %llu\n", (Llu)min_latency);
854  printf(" Latency max (usec): %llu\n", (Llu)max_latency);
855  printf(" Latency avg (usec): %llu\n", (Llu)((double)cum_latency
856  / total_cells));
857  printf("Latency stddev (usec): %llu\n", (Llu)std_dev(total_cells,
858  cum_latency, cum_sq_latency));
859  printf("\n");
860  fflush(stdout);
861 
862 }
863 
864 
871 double std_dev(::uint64_t nn, double sum, double sq_sum)
872 {
873  double mean = sum/nn;
874  double sq_std = sqrt((sq_sum/(double)nn) - pow(mean,2));
875  return sq_std;
876 }
int64_t timestamp
Definition: KeySpec.h:130
static std::mutex mutex
Definition: Logger.cc:43
Interface and base of config policy.
Definition: Config.h:149
double elapsed()
Returns the elapsed time.
Definition: Stopwatch.h:72
std::vector< Cell, CellAlloc > Cells
Definition: Cells.h:37
ScanSpec & get()
Returns the built ScanSpec object.
Definition: ScanSpec.h:566
int main(int argc, char **argv)
void clear()
Clears the state.
Definition: ScanSpec.h:555
static const uint32_t FLAG_DELETE_ROW
Definition: KeySpec.h:40
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
void generate_query_load_parallel(PropertiesPtr &props, String &tablename, int32_t parallel)
The Stopwatch measures elapsed time.
Helper class for printing usage banners on the command line.
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
const char * column_qualifier
Definition: KeySpec.h:128
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
void add_row(const string &str)
Adds a row to be returned in the scan.
Definition: ScanSpec.h:441
static const uint32_t FLAG_DELETE_CELL
Definition: KeySpec.h:42
int64_t revision
Definition: KeySpec.h:131
STL namespace.
#define HT_FATAL(msg)
Definition: Logger.h:339
size_t column_qualifier_len
Definition: KeySpec.h:129
const void * row
Definition: KeySpec.h:125
static const uint32_t FLAG_DELETE_COLUMN_FAMILY
Definition: KeySpec.h:41
Desc & cmdline_desc(const char *usage)
A macro which definds global functions like get_bool(), get_str(), get_i16() etc. ...
Definition: Config.cc:72
bool has(const String &name)
Check existence of a configuration value.
Definition: Config.h:57
std::shared_ptr< Namespace > NamespacePtr
Shared smart pointer to Namespace.
Definition: Namespace.h:333
std::shared_ptr< Client > ClientPtr
Definition: Client.h:156
void generate_query_load(PropertiesPtr &props, String &tablename, bool to_stdout,::int32_t delay, String &sample_fname, bool thrift)
Po::typed_value< int64_t > * i64(int64_t *v=0)
Definition: Properties.h:182
Po::typed_value< int32_t > * i32(int32_t *v=0)
Definition: Properties.h:178
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Compatibility Macros for C/C++.
void parse_command_line(int argc, char **argv, PropertiesPtr &props)
Po::typed_value< bool > * boo(bool *v=0)
Definition: Properties.h:162
Initialization helper for applications.
#define HT_END
Definition: Logger.h:220
Helper class for building a ScanSpec.
Definition: ScanSpec.h:318
void generate_update_load_parallel(PropertiesPtr &props, String &tablename,::int32_t parallel, bool flush,::uint32_t mutator_flags,::uint64_t flush_interval,::uint64_t shared_mutator_flush_interval,::int32_t delete_pct, bool thrift)
#define HT_ERROR_OUT
Definition: Logger.h:301
bool allow_unregistered_options(bool choice)
Toggle allow unregistered options.
Definition: Config.cc:654
Hypertable definitions
double std_dev(::uint64_t nn, double sum, double sq_sum)
Meta::list< AppPolicy, DataGeneratorPolicy, DefaultCommPolicy > Policies
void add_column(const string &str)
Adds a column family to be returned by the scan.
Definition: ScanSpec.h:408
Provides an STL-style iterator on DataGenerator objects.
void generate_update_load(PropertiesPtr &props, String &tablename, bool flush,::uint32_t mutator_flags,::uint64_t flush_interval,::uint64_t shared_mutator_flush_interval, bool to_stdout, String &sample_fname,::int32_t delete_pct, bool thrift)
#define HT_ERROR(msg)
Definition: Logger.h:299
std::shared_ptr< LoadClient > LoadClientPtr
Definition: LoadClient.h:92
The Stopwatch class measures elapsed time between instantiation (or a call to start) and a call to st...
Definition: Stopwatch.h:40
This is a generic exception class for Hypertable.
Definition: Error.h:314
A String class based on std::string.
void add_column_predicate(const string &column_family, const char *column_qualifier, uint32_t operation, const char *value, uint32_t value_len=0)
Adds a column predicate to the scan.
Definition: ScanSpec.h:426
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
Definition: Config.cc:607
Desc & cmdline_hidden_desc()
Get the command line hidden options description (for positional options)
Definition: Config.cc:81
Encapsulates decomposed key and value.
Definition: Cell.h:32
PositionalDesc & cmdline_positional_desc()
Get the command line positional options description.
Definition: Config.cc:90
const char * column_family
Definition: KeySpec.h:127
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
std::shared_ptr< Table > TablePtr
Definition: Table.h:53
void stop()
Stops the Stopwatch.
Definition: Stopwatch.h:58
unsigned long amount
Definition: ParallelLoad.h:89