0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
OperationProcessor.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
29 #ifndef Hypertable_Master_OperationProcessor_h
30 #define Hypertable_Master_OperationProcessor_h
31 
35 
36 #include <Common/Properties.h>
37 #include <Common/StringExt.h>
38 #include <Common/Thread.h>
39 
40 #include <boost/graph/adjacency_list.hpp>
41 #include <boost/graph/properties.hpp>
42 #include <boost/thread/condition.hpp>
43 
44 #include <chrono>
45 #include <condition_variable>
46 #include <fstream>
47 #include <list>
48 #include <map>
49 #include <memory>
50 #include <mutex>
51 #include <unordered_map>
52 
53 namespace Hypertable {
54 
62  public:
63  OperationProcessor(ContextPtr &context, size_t thread_count);
65  void add_operation(OperationPtr operation);
66  void add_operations(std::vector<OperationPtr> &operations);
67  OperationPtr remove_operation(int64_t hash_code);
68  void shutdown();
69  void join();
70  void wait_for_empty();
71  void wait_for_idle();
72  bool wait_for_idle(std::chrono::milliseconds max_wait);
73  void graphviz_output(String &output);
74  size_t size();
75  bool empty();
76  void wake_up();
77  void unblock(const String &name);
78  void activate(const String &name);
79  void state_description(String &output);
80 
81  private:
82 
83  void add_operation_internal(OperationPtr &operation);
84 
85  struct operation_t {
86  typedef boost::vertex_property_tag kind;
87  };
88 
90  typedef boost::vertex_property_tag kind;
91  };
92 
93  struct label_t {
94  typedef boost::vertex_property_tag kind;
95  };
96 
97  struct busy_t {
98  typedef boost::vertex_property_tag kind;
99  };
100 
101  struct permanent_t {
102  typedef boost::edge_property_tag kind;
103  };
104 
105  typedef boost::adjacency_list<
106  boost::listS, boost::listS, boost::bidirectionalS,
107  boost::property<boost::vertex_index_t, std::size_t,
108  boost::property<operation_t, OperationPtr,
109  boost::property<execution_time_t, int,
110  boost::property<label_t, String,
111  boost::property<busy_t, bool> > > > >,
112  boost::property<permanent_t, bool> >
114 
115  typedef boost::graph_traits<OperationGraph> GraphTraits;
116 
117  typedef GraphTraits::vertex_descriptor Vertex;
118  typedef GraphTraits::edge_descriptor Edge;
119 
120  typedef std::set<Vertex> VertexSet;
121 
122  struct vertex_info {
123  vertex_info(Vertex v, bool t) : vertex(v), taken(t) { }
124  vertex_info(Vertex v) : vertex(v), taken(false) { }
125  Vertex vertex;
126  bool taken;
127  };
128  typedef std::list<vertex_info> ExecutionList;
129 
130  typedef std::multimap<const String, Vertex> DependencyIndex;
131 
132  void add_dependencies(Vertex v, OperationPtr &operation);
133  void add_exclusivity(Vertex v, const String &name);
134  void add_dependency(Vertex v, const String &name);
135  void add_obstruction(Vertex v, const String &name);
136  void purge_from_dependency_index(Vertex v);
137  void purge_from_exclusivity_index(Vertex v);
138  void purge_from_obstruction_index(Vertex v);
139  void add_edge(Vertex v, Vertex u);
140  void add_edge_permanent(Vertex v, Vertex u);
141 
148  void retire_operation(Vertex v, OperationPtr &operation);
149 
154  void update_operation(Vertex v, OperationPtr &operation);
155 
160  void recompute_order();
161 
167  bool load_current();
168 
169  typedef std::set<OperationPtr> PerpetualSet;
170 
172  public:
174  OperationVertex(OperationPtr &op, Vertex &v) :
175  operation(op), vertex(v) { }
176  OperationPtr operation;
177  Vertex vertex;
178  };
179 
181  public:
182  ThreadContext(ContextPtr &mctx);
183  ~ThreadContext();
185  std::condition_variable cond;
186  std::condition_variable idle_cond;
190  VertexSet current_active;
191  std::unordered_map<int64_t, OperationVertex> operation_hash;
194  std::set<int64_t> op_ids;
195  ExecutionList current;
196  ExecutionList::iterator current_iter;
197  ExecutionList execution_order;
198  ExecutionList::iterator execution_order_iter;
199  DependencyIndex exclusivity_index;
200  DependencyIndex dependency_index;
201  DependencyIndex obstruction_index;
202  PerpetualSet perpetual_ops;
203  size_t busy_count;
205  bool shutdown;
206  bool paused;
207  VertexSet live;
209  boost::property_map<OperationGraph, execution_time_t>::type exec_time;
210  boost::property_map<OperationGraph, operation_t>::type ops;
211  boost::property_map<OperationGraph, label_t>::type label;
212  boost::property_map<OperationGraph, busy_t>::type busy;
213  boost::property_map<OperationGraph, permanent_t>::type permanent;
214  };
215 
216  struct not_permanent {
217  not_permanent(ThreadContext &context) : m_context(context) { }
218  bool operator()(const Edge &e) const {
219  return !m_context.permanent[e];
220  }
222  };
223 
224  struct ltvertexinfo {
225  ltvertexinfo(ThreadContext &context) : m_context(context) { }
226  inline bool operator() (const vertex_info &vi1, const vertex_info &vi2) {
228  return vi1.taken && !vi2.taken;
229  return m_context.exec_time[vi1.vertex] < m_context.exec_time[vi2.vertex];
230  }
232  };
233 
234  class Worker {
235  public:
236  Worker(ThreadContext &context) : m_context(context) { return; }
237  void operator()();
238  private:
240  };
241 
244  std::unique_ptr<std::ofstream> m_graphviz_out;
245  };
246 
249 }
250 
251 #endif // Hypertable_Master_OperationProcessor_h
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
static std::mutex mutex
Definition: Logger.cc:43
void update_operation(Vertex v, OperationPtr &operation)
Updates dependency relationship of an operation.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::unordered_map< int64_t, OperationVertex > operation_hash
void graphviz_output(String &output)
Declarations for Operation.
Program options handling.
void activate(const String &name)
void add_obstruction(Vertex v, const String &name)
void add_exclusivity(Vertex v, const String &name)
void recompute_order()
Recomputes operation execution order.
void unblock(const String &name)
boost::property_map< OperationGraph, operation_t >::type ops
void state_description(String &output)
std::multimap< const String, Vertex > DependencyIndex
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
void retire_operation(Vertex v, OperationPtr &operation)
Retires (remove) an operation.
boost::property_map< OperationGraph, execution_time_t >::type exec_time
void add_edge(Vertex v, Vertex u)
boost::property_map< OperationGraph, label_t >::type label
boost::property_map< OperationGraph, busy_t >::type busy
GraphTraits::vertex_descriptor Vertex
std::list< vertex_info > ExecutionList
boost::thread_group ThreadGroup
Definition: Thread.h:46
boost::property_map< OperationGraph, permanent_t >::type permanent
void add_edge_permanent(Vertex v, Vertex u)
std::unique_ptr< std::ofstream > m_graphviz_out
Importing boost::thread and boost::thread_group into the Hypertable namespace.
Manages the sending of operation results back to clients.
void add_operation(OperationPtr operation)
boost::adjacency_list< boost::listS, boost::listS, boost::bidirectionalS, boost::property< boost::vertex_index_t, std::size_t, boost::property< operation_t, OperationPtr, boost::property< execution_time_t, int, boost::property< label_t, String, boost::property< busy_t, bool > > > > >, boost::property< permanent_t, bool > > OperationGraph
Hypertable definitions
void add_operations(std::vector< OperationPtr > &operations)
void add_dependencies(Vertex v, OperationPtr &operation)
OperationPtr remove_operation(int64_t hash_code)
std::set< OperationPtr > PerpetualSet
OperationProcessor(ContextPtr &context, size_t thread_count)
void add_dependency(Vertex v, const String &name)
Runs a set of operaions with dependency relationships.
void add_operation_internal(OperationPtr &operation)
bool load_current()
Loads m_context.current list with operations to be executed.
boost::graph_traits< OperationGraph > GraphTraits
std::shared_ptr< Operation > OperationPtr
Smart pointer to Operation.
Definition: Operation.h:609
String extensions and helpers: sets, maps, append operators etc.
Declarations for Context.
bool operator()(const vertex_info &vi1, const vertex_info &vi2)
Declarations for ResponseManager.
GraphTraits::edge_descriptor Edge