0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ApplicationQueue.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #ifndef Hypertable_AyncComm_ApplicationQueue_h
29 #define Hypertable_AyncComm_ApplicationQueue_h
30 
33 
34 #include <Common/Logger.h>
35 #include <Common/StringExt.h>
36 #include <Common/Thread.h>
37 
38 #include <cassert>
39 #include <chrono>
40 #include <condition_variable>
41 #include <list>
42 #include <map>
43 #include <memory>
44 #include <mutex>
45 #include <unordered_map>
46 #include <vector>
47 
48 namespace Hypertable {
49 
90 
95  class GroupState {
96  public:
97  GroupState() : group_id(0), running(false), outstanding(1) { return; }
98  uint64_t group_id;
99 
100  bool running;
103  };
104 
107  typedef std::unordered_map<uint64_t, GroupState *> GroupStateMap;
108 
111  class RequestRec {
112  public:
113  RequestRec(ApplicationHandler *arh) : handler(arh), group_state(0) { return; }
114  ~RequestRec() { delete handler; }
117  };
118 
121  typedef std::list<RequestRec *> RequestQueue;
122 
126  public:
128  paused(false) { }
129 
131  RequestQueue queue;
132 
134  RequestQueue urgent_queue;
135 
137  GroupStateMap group_state_map;
138 
141 
143  std::condition_variable cond;
144 
146  std::condition_variable quiesce_cond;
147 
150 
153 
155  bool shutdown;
156 
158  bool paused;
159  };
160 
163  class Worker {
164 
165  public:
166  Worker(ApplicationQueueState &qstate, bool one_shot=false)
167  : m_state(qstate), m_one_shot(one_shot) { return; }
168 
171  void operator()() {
172  RequestRec *rec = 0;
173  RequestQueue::iterator iter;
174 
175  while (true) {
176  {
177  std::unique_lock<std::mutex> lock(m_state.mutex);
178 
180  while ((m_state.paused || m_state.queue.empty()) &&
181  m_state.urgent_queue.empty()) {
182  if (m_state.shutdown) {
184  return;
185  }
187  m_state.quiesce_cond.notify_all();
188  m_state.cond.wait(lock);
189  }
190 
191  if (m_state.shutdown) {
193  return;
194  }
195 
196  rec = 0;
197 
198  iter = m_state.urgent_queue.begin();
199  while (iter != m_state.urgent_queue.end()) {
200  rec = (*iter);
201  if (rec->group_state == 0 || !rec->group_state->running) {
202  if (rec->group_state)
203  rec->group_state->running = true;
204  m_state.urgent_queue.erase(iter);
205  break;
206  }
207  if (!rec->handler || rec->handler->is_expired()) {
208  iter = m_state.urgent_queue.erase(iter);
209  remove_expired(rec);
210  }
211  rec = 0;
212  iter++;
213  }
214 
215  if (rec == 0 && !m_state.paused) {
216  iter = m_state.queue.begin();
217  while (iter != m_state.queue.end()) {
218  rec = (*iter);
219  if (rec->group_state == 0 || !rec->group_state->running) {
220  if (rec->group_state)
221  rec->group_state->running = true;
222  m_state.queue.erase(iter);
223  break;
224  }
225  if (!rec->handler || rec->handler->is_expired()) {
226  iter = m_state.queue.erase(iter);
227  remove_expired(rec);
228  }
229  rec = 0;
230  iter++;
231  }
232  }
233 
234  if (rec == 0 && !m_one_shot) {
235  if (m_state.shutdown) {
237  return;
238  }
239  m_state.cond.wait(lock);
240  if (m_state.shutdown) {
242  return;
243  }
244  }
245 
247  }
248 
249  if (rec) {
250  if (rec->handler)
251  rec->handler->run();
252  remove(rec);
253  if (m_one_shot)
254  return;
255  }
256  else if (m_one_shot)
257  return;
258  }
259 
260  HT_INFO("thread exit");
261  }
262 
263  private:
264 
272  void remove(RequestRec *rec) {
273  if (rec->group_state) {
274  std::lock_guard<std::mutex> ulock(m_state.mutex);
275  rec->group_state->running = false;
276  rec->group_state->outstanding--;
277  if (rec->group_state->outstanding == 0) {
278  m_state.group_state_map.erase(rec->group_state->group_id);
279  delete rec->group_state;
280  }
281  }
282  delete rec;
283  }
284 
293  if (rec->group_state) {
294  rec->group_state->outstanding--;
295  if (rec->group_state->outstanding == 0) {
297  delete rec->group_state;
298  }
299  }
300  delete rec;
301  }
302 
305 
308  };
309 
312 
315 
317  std::vector<Thread::id> m_thread_ids;
318 
320  bool joined;
321 
326 
327  public:
328 
331  ApplicationQueue() : joined(true) {}
332 
341  ApplicationQueue(int worker_count, bool dynamic_threads=true)
342  : joined(false), m_dynamic_threads(dynamic_threads) {
343  m_state.threads_total = worker_count;
344  Worker Worker(m_state);
345  assert (worker_count > 0);
346  for (int i=0; i<worker_count; ++i) {
347  m_thread_ids.push_back(m_threads.create_thread(Worker)->get_id());
348  }
349  //threads
350  }
351 
354  virtual ~ApplicationQueue() {
355  if (!joined) {
356  shutdown();
357  join();
358  }
359  }
360 
365  std::vector<Thread::id> get_thread_ids() const {
366  return m_thread_ids;
367  }
368 
374  void shutdown() {
375  m_state.shutdown = true;
376  m_state.cond.notify_all();
377  }
378 
386  bool wait_for_idle(std::chrono::time_point<std::chrono::steady_clock> deadline,
387  int reserve_threads=0) {
388  std::unique_lock<std::mutex> lock(m_state.mutex);
389  return m_state.quiesce_cond.wait_until(lock, deadline,
390  [this, reserve_threads](){ return m_state.threads_available >= (m_state.threads_total-reserve_threads); });
391  }
392 
397  void join() {
398  if (!joined) {
399  m_threads.join_all();
400  joined = true;
401  }
402  }
403 
406  void start() {
407  std::lock_guard<std::mutex> lock(m_state.mutex);
408  m_state.paused = false;
409  m_state.cond.notify_all();
410  }
411 
416  void stop() {
417  std::lock_guard<std::mutex> lock(m_state.mutex);
418  m_state.paused = true;
419  }
420 
428  virtual void add(ApplicationHandler *app_handler) {
429  GroupStateMap::iterator uiter;
430  uint64_t group_id = app_handler->get_group_id();
431  RequestRec *rec = new RequestRec(app_handler);
432  rec->group_state = 0;
433 
434  HT_ASSERT(app_handler);
435 
436  if (group_id != 0) {
437  std::lock_guard<std::mutex> ulock(m_state.mutex);
438  if ((uiter = m_state.group_state_map.find(group_id))
439  != m_state.group_state_map.end()) {
440  rec->group_state = (*uiter).second;
441  rec->group_state->outstanding++;
442  }
443  else {
444  rec->group_state = new GroupState();
445  rec->group_state->group_id = group_id;
446  m_state.group_state_map[group_id] = rec->group_state;
447  }
448  }
449 
450  {
451  std::lock_guard<std::mutex> lock(m_state.mutex);
452  if (app_handler->is_urgent()) {
453  m_state.urgent_queue.push_back(rec);
454  if (m_dynamic_threads && m_state.threads_available == 0) {
455  Worker worker(m_state, true);
456  Thread t(worker);
457  }
458  }
459  else
460  m_state.queue.push_back(rec);
461  m_state.cond.notify_one();
462  }
463  }
464 
473  virtual void add_unlocked(ApplicationHandler *app_handler) {
474  add(app_handler);
475  }
476 
481  size_t backlog() {
482  std::lock_guard<std::mutex> lock(m_state.mutex);
483  return m_state.queue.size() + m_state.urgent_queue.size();
484  }
485  };
486 
488  typedef std::shared_ptr<ApplicationQueue> ApplicationQueuePtr;
490 }
491 
492 #endif // Hypertable_AyncComm_ApplicationQueue_h
virtual void add(ApplicationHandler *app_handler)
Adds a request (application request handler) to the application queue.
static std::mutex mutex
Definition: Logger.cc:43
std::condition_variable quiesce_cond
Condition variable used to signal quiesced queue.
void start()
Starts application queue.
Declarations of ApplicationHandler.
GroupState * group_state
Pointer to GroupState to which request belongs.
virtual void add_unlocked(ApplicationHandler *app_handler)
Adds a request (application request handler) to the application queue.
ThreadGroup m_threads
Boost thread group for managing threads.
ApplicationQueueState & m_state
Shared application queue state object.
int outstanding
Number of outstanding (uncompleted) requests in queue for this group.
std::condition_variable cond
Condition variable to signal pending handlers.
std::vector< Thread::id > get_thread_ids() const
Returns all the thread IDs for this threadgroup.
#define HT_INFO(msg)
Definition: Logger.h:271
std::mutex mutex
Mutex for serializing concurrent access
void stop()
Stops (pauses) application queue, preventing non-urgent requests from being executed.
RequestQueue urgent_queue
Urgent request queue.
uint64_t get_group_id()
Returns the group ID that this handler belongs to.
Worker(ApplicationQueueState &qstate, bool one_shot=false)
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void join()
Waits for a shutdown to complete.
boost::thread_group ThreadGroup
Definition: Thread.h:46
Logging routines and macros.
ApplicationHandler * handler
Pointer to ApplicationHandler.
ApplicationQueue(int worker_count, bool dynamic_threads=true)
Constructor initialized with worker thread count.
void remove_expired(RequestRec *rec)
Removes and deletes an expired request.
GroupStateMap group_state_map
Group ID to group state map.
Importing boost::thread and boost::thread_group into the Hypertable namespace.
ApplicationQueue()
Default constructor used by derived classes only.
bool wait_for_idle(std::chrono::time_point< std::chrono::steady_clock > deadline, int reserve_threads=0)
Wait for queue to become idle (with timeout).
Application queue state shared among worker threads.
bool m_one_shot
Set to true if thread should exit after executing request.
bool is_expired()
Returns true if request has expired.
Application queue worker thread function (functor)
Hypertable definitions
std::list< RequestRec * > RequestQueue
Individual request queue.
bool paused
Flag indicating if queue has been paused.
virtual void run()=0
Carries out the request.
size_t backlog()
Returns the request backlog Returns the request backlog, which is the number of requests waiting on t...
bool running
true if a request from this group is being executed
std::unordered_map< uint64_t, GroupState * > GroupStateMap
Hash map of thread group ID to GroupState.
ApplicationQueueState m_state
Application queue state object.
bool shutdown
Flag indicating if shutdown is in progress.
void shutdown()
Shuts down the application queue.
Tracks group execution state.
Base clase for application handlers.
virtual ~ApplicationQueue()
Destructor.
bool joined
Flag indicating if threads have joined after a shutdown.
boost::thread Thread
Definition: Thread.h:45
Declarations for ApplicationQueueInterface.
bool m_dynamic_threads
Set to true if queue is configured to allow dynamic thread creation.
std::vector< Thread::id > m_thread_ids
Vector of thread IDs.
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
String extensions and helpers: sets, maps, append operators etc.
void operator()()
Thread run method.
Abstract interface for application queue.
bool is_urgent()
Returns true if request is urgent.