0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationProcessor.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
29 #include <Common/Compat.h>
30 #include "OperationProcessor.h"
31 
33 
34 #include <Common/Path.h>
35 #include <Common/StringExt.h>
36 
37 #include <boost/graph/topological_sort.hpp>
38 #include <boost/graph/graphviz.hpp>
39 
40 #include <chrono>
41 #include <iomanip>
42 #include <sstream>
43 #include <thread>
44 #include <unordered_map>
45 
46 using namespace Hypertable;
47 using namespace boost;
48 using namespace std;
49 
51  : master_context(mctx), current_blocked(0), busy_count(0),
52  need_order_recompute(false), shutdown(false), paused(false) {
53  current_iter = current.end();
55 }
56 
58 }
59 
60 
61 OperationProcessor::OperationProcessor(ContextPtr &context, size_t thread_count)
62  : m_context(context) {
63 
64  if (context->props->get_bool("Hypertable.Master.RecordGraphvizStream")) {
65  Path data_dir = Path(context->props->get_str("Hypertable.DataDirectory"));
66  string run_dir = (data_dir /= "/run").string();
67  string filename = run_dir + "/graphviz-stream";
68  m_graphviz_out = make_unique<std::ofstream>(filename.c_str(), ofstream::out|ofstream::app);
69  }
70 
72  m_context.op = this;
73  Worker worker(m_context);
74  for (size_t i=0; i<thread_count; ++i)
75  m_threads.create_thread(worker);
76 }
77 
79  shutdown();
80  join();
81 }
82 
83 
85  std::lock_guard<std::mutex> lock(m_context.mutex);
86 
87  //HT_INFOF("Adding operation %s", operation->label().c_str());
88 
89  // Make sure this operation hasn't already been added
90  if (m_context.op_ids.count(operation->id()) > 0)
91  return;
92 
93  if (!operation->is_complete() || operation->is_perpetual()) {
94  add_operation_internal(operation);
97  m_context.cond.notify_all();
98  }
99  else if (operation->get_remove_approval_mask() == 0)
100  m_context.master_context->response_manager->add_operation(operation);
101 
102 }
103 
104 void OperationProcessor::add_operations(std::vector<OperationPtr> &operations) {
105  std::lock_guard<std::mutex> lock(m_context.mutex);
106  bool added = false;
107 
108  for (auto & operation : operations) {
109 
110  //HT_INFOF("Adding operation %s", operations[i]->label().c_str());
111 
112  // Make sure this operation hasn't already been added
113  if (m_context.op_ids.count(operation->id()) > 0)
114  continue;
115 
116  if (!operation->is_complete() || operation->is_perpetual()) {
117  add_operation_internal(operation);
118  added = true;
119  }
120  else if (operation->get_remove_approval_mask() == 0)
121  m_context.master_context->response_manager->add_operation(operation);
122  }
123 
124  if (added) {
127  m_context.cond.notify_all();
128  }
129 }
130 
132 
133  if (operation->exclusive()) {
134  if (m_context.exclusive_ops.count(operation->name()) > 0)
136  "Dropping %s because another one is outstanding",
137  operation->name().c_str());
138  m_context.exclusive_ops.insert(operation->name());
139  }
140 
141  Vertex v = add_vertex(m_context.graph);
142  put(m_context.label, v, operation->graphviz_label());
143  put(m_context.exec_time, v, 0);
144  put(m_context.ops, v, operation);
145  put(m_context.busy, v, false);
146  m_context.live.insert(v);
147  m_context.operation_hash[operation->hash_code()]=OperationVertex(operation,v);
148  add_dependencies(v, operation);
149  HT_ASSERT(m_context.op_ids.insert(operation->id()).second);
150 }
151 
153  std::lock_guard<std::mutex> lock(m_context.mutex);
154  OperationPtr operation;
155  Vertex vertex;
156 
157  // std::unordered_map<int64_t, OperationVertex>::iterator iter =
158  auto iter = m_context.operation_hash.find(hash_code);
159 
160  if (iter == m_context.operation_hash.end() ||
161  m_context.busy[iter->second.vertex])
162  return 0;
163 
164  operation = iter->second.operation;
165  vertex = iter->second.vertex;
166 
167  retire_operation(vertex, operation);
168 
169  return operation;
170 }
171 
173  std::lock_guard<std::mutex> lock(m_context.mutex);
174  m_context.shutdown = true;
175  m_context.cond.notify_all();
176 }
177 
179  m_threads.join_all();
180 }
181 
183  std::unique_lock<std::mutex> lock(m_context.mutex);
184  m_context.idle_cond.wait(lock, [this](){
185  return num_vertices(m_context.graph) == 0; });
186 }
187 
189  std::unique_lock<std::mutex> lock(m_context.mutex);
190  while (m_context.busy_count > 0 ||
193  m_context.idle_cond.wait(lock);
194 }
195 
196 bool OperationProcessor::wait_for_idle(std::chrono::milliseconds max_wait) {
197  std::unique_lock<std::mutex> lock(m_context.mutex);
198  while (m_context.busy_count > 0 ||
201  if (m_context.idle_cond.wait_for(lock, max_wait) == std::cv_status::timeout)
202  return false;
203  }
204  return true;
205 }
206 
208  return num_vertices(m_context.graph);
209 }
210 
212  return num_vertices(m_context.graph) == 0;
213 }
214 
216  std::lock_guard<std::mutex> lock(m_context.mutex);
218  m_context.cond.notify_all();
219 }
220 
222  std::lock_guard<std::mutex> lock(m_context.mutex);
223  std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
224  bool unblocked_something = false;
225 
226  for (bound = m_context.obstruction_index.equal_range(name);
227  bound.first != bound.second; ++bound.first)
228  if (m_context.ops[bound.first->second]->unblock())
229  unblocked_something = true;
230 
231  for (bound = m_context.exclusivity_index.equal_range(name);
232  bound.first != bound.second; ++bound.first)
233  if (m_context.ops[bound.first->second]->unblock())
234  unblocked_something = true;
235 
236  if (unblocked_something) {
239  m_context.cond.notify_all();
240  }
241 
242 }
243 
245  std::lock_guard<std::mutex> lock(m_context.mutex);
246 
247  if (!m_context.perpetual_ops.empty()) {
248  DependencySet names;
249  PerpetualSet::iterator iter = m_context.perpetual_ops.begin();
250  OperationPtr operation;
251  while (iter != m_context.perpetual_ops.end()) {
252  (*iter)->obstructions(names);
253 #if 0
254  {
255  String str;
256  for (const auto &tag : names)
257  str += tag + " ";
258  HT_INFOF("Activating %s with obstructions %s", (*iter)->label().c_str(), str.c_str());
259  }
260 #endif
261  if (names.count(name) > 0) {
262  PerpetualSet::iterator rm_iter = iter;
263  operation = *iter++;
264  m_context.perpetual_ops.erase(rm_iter);
265  operation->set_state(OperationState::INITIAL);
266  add_operation_internal(operation);
269  m_context.cond.notify_all();
270  }
271  else
272  ++iter;
273  }
274  }
275 }
276 
281  Vertex vertex;
282  OperationPtr operation;
283  bool current_needs_loading = true;
284 
285  try {
286 
287  while (true) {
288  {
289  std::unique_lock<std::mutex> lock(m_context.mutex);
290 
291  if (m_context.shutdown)
292  return;
293 
294  while (m_context.current_iter == m_context.current.end()) {
295 
298  current_needs_loading = true;
299  }
300  else
301  current_needs_loading =
303 
304  if (current_needs_loading &&
306  if (m_context.op->load_current()) {
307  if (m_context.shutdown)
308  return;
309  continue;
310  }
311  }
312 
313  if (m_context.busy_count == 0)
314  m_context.idle_cond.notify_all();
315 
316  if (m_context.shutdown)
317  return;
318 
319  m_context.cond.wait(lock);
320 
321  if (m_context.shutdown)
322  return;
323  }
324 
325  vertex = m_context.current_iter->vertex;
327  operation = m_context.ops[vertex];
328  m_context.busy[vertex] = true;
330  }
331 
332  try {
333 
334  operation->pre_run();
335  if (!operation->is_blocked())
336  operation->execute();
337  operation->post_run();
338 
339  {
340  std::lock_guard<std::mutex> lock(m_context.mutex);
341  m_context.busy[vertex] = false;
343  m_context.current_active.erase(vertex);
344  if (operation->is_complete())
345  m_context.op->retire_operation(vertex, operation);
346  else if (operation->is_blocked())
348  else
349  m_context.op->update_operation(vertex, operation);
350  }
351  }
352  catch (Exception &e) {
353  std::lock_guard<std::mutex> lock(m_context.mutex);
354  m_context.busy[vertex] = false;
356  m_context.current_active.erase(vertex);
357  if (e.code() == Error::INDUCED_FAILURE) {
358  m_context.shutdown = true;
359  m_context.cond.notify_all();
360  m_context.master_context->mml_writer->close();
361  break;
362  }
363  HT_ERROR_OUT << e << HT_END;
364  std::this_thread::sleep_for(std::chrono::milliseconds(5000));
366  }
367 
368  }
369  }
370  catch (std::exception &e) {
371  std::cout << e.what() << std::endl;
372  }
373 }
374 
375 
377  DependencySet names;
378 
379  operation->exclusivities(names);
380  for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter)
381  add_exclusivity(v, *iter);
382 
383  operation->dependencies(names);
384  for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter)
385  add_dependency(v, *iter);
386 
387  operation->obstructions(names);
388  for (DependencySet::iterator iter = names.begin(); iter != names.end(); ++iter) {
389  add_obstruction(v, *iter);
390  }
391 
392 }
393 
394 
395 
397  GraphTraits::in_edge_iterator in_i, in_end;
398  DependencySet names;
399  Vertex src;
400  std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
401 
402  // Check obstructions index and add link (v -> obstruction)
403  for (bound = m_context.obstruction_index.equal_range(name);
404  bound.first != bound.second; ++bound.first)
405  add_edge(v, bound.first->second);
406 
407  // Check dependency index and add link (dependency -> v)
408  for (bound = m_context.dependency_index.equal_range(name);
409  bound.first != bound.second; ++bound.first)
410  add_edge(bound.first->second, v);
411 
412  // Return now if this exclusivity already exists
413  for (bound = m_context.exclusivity_index.equal_range(name);
414  bound.first != bound.second; ++bound.first) {
415  if (v == bound.first->second)
416  return;
417  }
418 
419  for (DependencyIndex::iterator iter = m_context.exclusivity_index.lower_bound(name);
420  iter != m_context.exclusivity_index.end() && iter->first == name; ++iter) {
421  tie(in_i, in_end) = in_edges(iter->second, m_context.graph);
422  for (; in_i != in_end; ++in_i) {
423  src = source(*in_i, m_context.graph);
424  m_context.ops[src]->exclusivities(names);
425  if (names.find(name) != names.end())
426  break;
427  }
428  if (in_i == in_end) {
429  HT_ASSERT(v != iter->second);
430  add_edge_permanent(v, iter->second);
431  break;
432  }
433  }
434 
435  m_context.exclusivity_index.insert(DependencyIndex::value_type(name, v));
436 }
437 
438 
440  std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
441 
442  // Return immediately if dependency already exists
443  for (bound = m_context.dependency_index.equal_range(name);
444  bound.first != bound.second; ++bound.first) {
445  if (bound.first->second == v)
446  return;
447  }
448 
449  // Check exclusivity index and add link (v -> exclusivity)
450  for (bound = m_context.exclusivity_index.equal_range(name);
451  bound.first != bound.second; ++bound.first)
452  add_edge(v, bound.first->second);
453 
454  // Add perpetual operations if necessary
455  if (!m_context.perpetual_ops.empty()) {
456  DependencySet names;
457  PerpetualSet::iterator iter = m_context.perpetual_ops.begin();
458  OperationPtr operation;
459  while (iter != m_context.perpetual_ops.end()) {
460  (*iter)->obstructions(names);
461  if (names.count(name) > 0) {
462  PerpetualSet::iterator rm_iter = iter;
463  operation = *iter++;
464  m_context.perpetual_ops.erase(rm_iter);
465  operation->set_state(OperationState::INITIAL);
466  add_operation_internal(operation);
467  }
468  else
469  ++iter;
470  }
471  }
472 
473  // Check obstruction index and add link (v -> obstruction)
474  for (bound = m_context.obstruction_index.equal_range(name);
475  bound.first != bound.second; ++bound.first)
476  add_edge(v, bound.first->second);
477 
478  m_context.dependency_index.insert(DependencyIndex::value_type(name, v));
479 }
480 
481 
483  std::pair<DependencyIndex::iterator, DependencyIndex::iterator> bound;
484 
485  // Return immediately if obstruction already exists
486  for (bound = m_context.obstruction_index.equal_range(name);
487  bound.first != bound.second; ++bound.first) {
488  if (bound.first->second == v)
489  return;
490  }
491 
492  // Check exclusivity index and add link (exclusivity -> v)
493  for (bound = m_context.exclusivity_index.equal_range(name);
494  bound.first != bound.second; ++bound.first)
495  add_edge(bound.first->second, v);
496 
497  // Check dependency index and add link (dependency -> v)
498  for (bound = m_context.dependency_index.equal_range(name);
499  bound.first != bound.second; ++bound.first)
500  add_edge(bound.first->second, v);
501 
502  m_context.obstruction_index.insert(DependencyIndex::value_type(name, v));
503 }
504 
505 
507  DependencyIndex::iterator iter, del_iter;
508 
509  // Purge from dependency index
510  iter = m_context.dependency_index.begin();
511  while (iter != m_context.dependency_index.end()) {
512  if (iter->second == v) {
513  del_iter = iter++;
514  m_context.dependency_index.erase(del_iter);
515  }
516  else
517  ++iter;
518  }
519 }
520 
521 
523  DependencyIndex::iterator iter, del_iter;
524 
525  // Purge from exclusivity index
526  iter = m_context.exclusivity_index.begin();
527  while (iter != m_context.exclusivity_index.end()) {
528  if (iter->second == v) {
529  del_iter = iter++;
530  m_context.exclusivity_index.erase(del_iter);
531  }
532  else
533  ++iter;
534  }
535 }
536 
537 
539  DependencyIndex::iterator iter, del_iter;
540 
541  // Purge from obstruction index
542  iter = m_context.obstruction_index.begin();
543  while (iter != m_context.obstruction_index.end()) {
544  if (iter->second == v) {
545  del_iter = iter++;
546  m_context.obstruction_index.erase(del_iter);
547  }
548  else
549  ++iter;
550  }
551 }
552 
553 
555  std::pair<Edge, bool> ep = ::add_edge(v, u, m_context.graph);
556  HT_ASSERT(ep.second);
557  put(m_context.permanent, ep.first, false);
558 }
559 
561  std::pair<Edge, bool> ep = ::add_edge(v, u, m_context.graph);
562  HT_ASSERT(ep.second);
563  put(m_context.permanent, ep.first, true);
564 }
565 
567  std::lock_guard<std::mutex> lock(m_context.mutex);
568  std::ostringstream oss;
569  write_graphviz(oss, m_context.graph, make_label_writer(m_context.label));
570  output = oss.str();
571 }
572 
574  std::lock_guard<std::mutex> lock(m_context.mutex);
575  std::ostringstream oss;
576  DependencySet names;
577 
578  oss << "Num vertices = " << num_vertices(m_context.graph) << "\n";
579  oss << "Busy count = " << m_context.busy_count << "\n";
580  oss << "Active set size = " << m_context.current_active.size() << "\n";
581  oss << "Need order recompute = " << (m_context.need_order_recompute ? "true\n" : "false\n");
582  oss << "Shutdown = " << (m_context.shutdown ? "true\n" : "false\n");
583  oss << "Paused = " << (m_context.paused ? "true\n" : "false\n");
584  oss << "\n";
585 
586  std::pair<GraphTraits::vertex_iterator, GraphTraits::vertex_iterator> vp;
587  std::set<Vertex> vset;
588  size_t i = 0;
589  bool first;
590  for (vp = vertices(m_context.graph); vp.first != vp.second; ++vp.first) {
591  vset.insert(*vp.first);
592  oss << i << ": " << m_context.ops[*vp.first]->label() << "\n";
593  oss << " active: " << ((m_context.current_active.count(*vp.first) > 0) ? "true\n" : "false\n");
594  oss << " live: " << ((m_context.live.count(*vp.first) > 0) ? "true\n" : "false\n");
595  oss << " exclusive: " << (m_context.ops[*vp.first]->exclusive() ? "true\n" : "false\n");
596  oss << " perpetual: " << (m_context.ops[*vp.first]->is_perpetual() ? "true\n" : "false\n");
597  oss << " blocked: " << (m_context.ops[*vp.first]->is_blocked() ? "true\n" : "false\n");
598  oss << " state: " << OperationState::get_text(m_context.ops[*vp.first]->get_state()) << "\n";
599  oss << " dependencies: (";
600  first = true;
601  names.clear();
602  m_context.ops[*vp.first]->dependencies(names);
603  for (const auto &str : names) {
604  if (!first)
605  oss << ",";
606  oss << str;
607  first = false;
608  }
609  oss << ")\n";
610  oss << " obstructions: (";
611  first = true;
612  names.clear();
613  m_context.ops[*vp.first]->obstructions(names);
614  for (const auto &str : names) {
615  if (!first)
616  oss << ",";
617  oss << str;
618  first = false;
619  }
620  oss << ")\n";
621  oss << " exclusivities: (";
622  first = true;
623  names.clear();
624  m_context.ops[*vp.first]->exclusivities(names);
625  for (const auto &str : names) {
626  if (!first)
627  oss << ",";
628  oss << str;
629  first = false;
630  }
631  oss << ")\n";
632  oss << "\n";
633  }
634 
635  oss << "Current:\n";
636  for (ExecutionList::iterator iter = m_context.current.begin();
637  iter != m_context.current.end(); ++iter) {
638  if (iter == m_context.current_iter)
639  oss << "*";
640  if (vset.count(iter->vertex))
641  oss << m_context.ops[iter->vertex]->label() << "\n";
642  else
643  oss << "[retired]\n" ;
644  }
646  oss << "*\n";
647  oss << "\n";
648 
649  oss << "Execution order:\n";
650  for (ExecutionList::iterator iter = m_context.execution_order.begin();
651  iter != m_context.execution_order.end(); ++iter) {
652  if (iter == m_context.execution_order_iter)
653  oss << "*";
654  if (vset.count(iter->vertex)) {
655  oss << m_context.ops[iter->vertex]->label() << " (time=";
656  oss << m_context.exec_time[iter->vertex] << ")\n";
657  }
658  else
659  oss << "[retired]\n" ;
660  }
662  oss << "*\n";
663  oss << "\n";
664 
665  oss << "Graphviz:\n";
666  write_graphviz(oss, m_context.graph, make_label_writer(m_context.label));
667  output = oss.str();
668 }
669 
670 
675  if (in_degree(v, m_context.graph) > 0)
677  clear_vertex(v, m_context.graph);
678  remove_vertex(v, m_context.graph);
679  m_context.live.erase(v);
680  m_context.operation_hash.erase(operation->hash_code());
681  if (operation->exclusive())
682  m_context.exclusive_ops.erase(operation->name());
683  //HT_INFOF("Retiring op %p vertex %p", operation.get(), v);
684 
685  if (m_context.master_context->metrics_handler)
686  m_context.master_context->metrics_handler->operation_increment();
687 
688  if (operation->is_perpetual())
689  m_context.perpetual_ops.insert(operation);
690  else {
691  if (operation->get_remove_approval_mask() == 0 &&
692  operation->get_state() == OperationState::COMPLETE)
693  m_context.master_context->response_manager->add_operation(operation);
694  }
695  m_context.op_ids.erase(operation->id());
696 }
697 
698 
701 
704 
705  remove_in_edge_if(v, np, m_context.graph);
706  remove_out_edge_if(v, np, m_context.graph);
707 
708  m_context.op->add_dependencies(v, operation);
709 
710  // Add sub-operations
711  std::vector<OperationPtr> sub_ops;
712  operation->fetch_sub_operations(sub_ops);
713  for (auto op : sub_ops) {
714  if (m_context.op_ids.count(op->id()) == 0 && !op->is_complete()) {
716  }
717  }
718 
721  m_context.cond.notify_all();
722 }
723 
724 
726 
727  // re-assign vertex indexes
728  property_map<OperationGraph, vertex_index_t>::type index = get(vertex_index, m_context.graph);
729  int i=0;
730  std::pair<GraphTraits::vertex_iterator, GraphTraits::vertex_iterator> vp;
731  for (vp = vertices(m_context.graph); vp.first != vp.second; ++vp.first)
732  put(index, *vp.first, i++);
733 
734  if (m_graphviz_out) {
735  write_graphviz(*m_graphviz_out, m_context.graph, make_label_writer(m_context.label));
736  *m_graphviz_out << flush;
737  }
738 
739  m_context.execution_order.clear();
740  try {
741  topological_sort(m_context.graph, std::back_inserter(m_context.execution_order));
742  }
743  catch (std::invalid_argument &e) {
744  write_graphviz(std::cout, m_context.graph, make_label_writer(m_context.label));
745  }
746 
747  ExecutionList::iterator iter;
748 
749  for (iter = m_context.execution_order.begin(); iter != m_context.execution_order.end(); ++iter) {
750  HT_ASSERT(m_context.live.count(iter->vertex) > 0);
751  //HT_ASSERT(!m_context.ops[*iter]->is_complete());
752  m_context.exec_time[iter->vertex] = 0;
753  }
754 
755  for (iter = m_context.execution_order.begin(); iter != m_context.execution_order.end(); ++iter) {
756  // Walk through the out_edges an calculate the maximum time.
757  if (out_degree (iter->vertex, m_context.graph) > 0) {
758  OperationGraph::out_edge_iterator j, j_end;
759  int maxdist=0;
760  for (boost::tie(j, j_end) = out_edges(iter->vertex, m_context.graph); j != j_end; ++j)
761  maxdist=(std::max)(m_context.exec_time[target(*j, m_context.graph)], maxdist);
762  m_context.exec_time[iter->vertex] = maxdist+1;
763  }
764  }
765 
766  // Sort execution order
767  std::vector<struct vertex_info> vvec;
768  for (iter = m_context.execution_order.begin(); iter != m_context.execution_order.end(); ++iter) {
769  iter->taken = m_context.busy[iter->vertex];
770  vvec.push_back(*iter);
771  }
772 
773  std::set<Vertex> vset;
774 
775  struct ltvertexinfo ltvi(m_context);
776  std::sort(vvec.begin(), vvec.end(), ltvi);
777  m_context.execution_order.clear();
778  for (size_t i=0; i<vvec.size(); i++) {
779  HT_ASSERT(vset.count(vvec[i].vertex) == 0);
780  vset.insert(vvec[i].vertex);
781  m_context.execution_order.push_back(vvec[i]);
782  }
783 
784  m_context.execution_order_iter = m_context.execution_order.begin();
785 
786  m_context.need_order_recompute = false;
787  if (m_context.execution_order.empty())
788  m_context.idle_cond.notify_all();
789 
790 }
791 
792 
794  size_t blocked = 0;
795  size_t retired = 0;
796 
797  m_context.current.clear();
798  m_context.current_active.clear();
800  for (int time_slot = m_context.exec_time[m_context.execution_order_iter->vertex];
803  // If live, add it to the current set
804  if (m_context.live.count(m_context.execution_order_iter->vertex) > 0) {
807  if (!m_context.execution_order_iter->taken &&
808  m_context.ops[m_context.execution_order_iter->vertex]->is_blocked())
809  blocked++;
810  }
811  else
812  retired++;
813  }
814 
815  //HT_INFOF("current size = %lu, blocked = %lu", m_context.current.size(), blocked);
816 
818  while (m_context.current_iter != m_context.current.end() && m_context.current_iter->taken)
820 
821  if (m_context.current_iter != m_context.current.end() ||
822  blocked != m_context.current.size()) {
823  m_context.cond.notify_all();
824  return true;
825  }
826  return false;
827 }
828 
static String filename
Definition: Config.cc:48
Boost library.
Definition: Properties.cc:39
void update_operation(Vertex v, OperationPtr &operation)
Updates dependency relationship of an operation.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Compatibility class for boost::filesystem::path.
std::unordered_map< int64_t, OperationVertex > operation_hash
void graphviz_output(String &output)
Declarations for OperationProcessor.
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
void activate(const String &name)
void add_obstruction(Vertex v, const String &name)
STL namespace.
void add_exclusivity(Vertex v, const String &name)
void recompute_order()
Recomputes operation execution order.
void unblock(const String &name)
boost::property_map< OperationGraph, operation_t >::type ops
const char * get_text(int32_t state)
Definition: Operation.cc:609
void state_description(String &output)
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
void retire_operation(Vertex v, OperationPtr &operation)
Retires (remove) an operation.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
boost::property_map< OperationGraph, execution_time_t >::type exec_time
void add_edge(Vertex v, Vertex u)
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
boost::property_map< OperationGraph, label_t >::type label
boost::property_map< OperationGraph, busy_t >::type busy
GraphTraits::vertex_descriptor Vertex
boost::property_map< OperationGraph, permanent_t >::type permanent
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
void add_edge_permanent(Vertex v, Vertex u)
std::unique_ptr< std::ofstream > m_graphviz_out
#define HT_ERROR_OUT
Definition: Logger.h:301
void add_operation(OperationPtr operation)
Hypertable definitions
void add_operations(std::vector< OperationPtr > &operations)
void add_dependencies(Vertex v, OperationPtr &operation)
OperationPtr remove_operation(int64_t hash_code)
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
This is a generic exception class for Hypertable.
Definition: Error.h:314
OperationProcessor(ContextPtr &context, size_t thread_count)
void add_dependency(Vertex v, const String &name)
void add_operation_internal(OperationPtr &operation)
bool load_current()
Loads m_context.current list with operations to be executed.
std::set< String > DependencySet
Set of dependency string.
Definition: Operation.h:107
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
String extensions and helpers: sets, maps, append operators etc.
int code() const
Returns the error code.
Definition: Error.h:391