0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
TableScannerQueue.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hypertable_Lib_TableScannerQueue_h
23 #define Hypertable_Lib_TableScannerQueue_h
24 
25 #include "ScanCells.h"
26 
28 
29 #include <condition_variable>
30 #include <list>
31 #include <mutex>
32 
33 namespace Hypertable {
34 
41 
42  public:
43 
47 
49 
52  virtual void add(ApplicationHandler *app_handler) {
53  std::lock_guard<std::mutex> lock(m_mutex);
54  m_work_queue.push_back(app_handler);
55  m_cond.notify_one();
56  }
57 
58  virtual void add_unlocked(ApplicationHandler *app_handler) { }
59 
60  void next_result(ScanCellsPtr &cells, int *error, std::string &error_msg) {
61  ApplicationHandler *app_handler;
62  cells = 0;
63  *error = Error::OK;
64  while(true) {
65  {
66  std::unique_lock<std::mutex> lock(m_mutex);
67  if (m_error != Error::OK && !m_error_shown) {
68  *error = m_error;
69  error_msg = m_error_msg;
70  m_error_shown = true;
71  break;
72  }
73  else if (!m_cells_queue.empty()) {
74  cells = m_cells_queue.front();
75  m_cells_queue.pop_front();
76  break;
77  }
78  m_cond.wait(lock, [this](){
79  return !m_work_queue.empty() || !m_cells_queue.empty(); });
80  if (!m_work_queue.size())
81  continue;
82  app_handler = m_work_queue.front();
83  HT_ASSERT(app_handler);
84  m_work_queue.pop_front();
85  }
86  app_handler->run();
87  delete app_handler;
88  }
89  if (m_error != Error::OK) {
90  *error = m_error;
91  error_msg = m_error_msg;
92  cells = 0;
93  }
94  HT_ASSERT(cells != 0 || *error != Error::OK);
95  }
96 
97  void add_cells(ScanCellsPtr &cells) {
98  std::lock_guard<std::mutex> lock(m_mutex);
99  m_cells_queue.push_back(cells);
100  m_cond.notify_one();
101  }
102 
103  void set_error(int error, const std::string &error_msg) {
104  std::lock_guard<std::mutex> lock(m_mutex);
105  m_error = error;
106  m_error_msg = error_msg;
107  m_error_shown = false;
108  }
109 
110  private:
111 
112  typedef std::list<ApplicationHandler *> WorkQueue;
113  typedef std::list<ScanCellsPtr> CellsQueue;
115  std::condition_variable m_cond;
116  WorkQueue m_work_queue;
117  CellsQueue m_cells_queue;
118  std::string m_error_msg;
119  int m_error {};
120  bool m_error_shown {};
121  };
122 
124  typedef std::shared_ptr<TableScannerQueue> TableScannerQueuePtr;
125 
126 }
127 
128 #endif // Hypertable_Lib_TableScannerQueue_h
Provides application work queue and worker threads.
void next_result(ScanCellsPtr &cells, int *error, std::string &error_msg)
void set_error(int error, const std::string &error_msg)
static std::mutex mutex
Definition: Logger.cc:43
std::shared_ptr< TableScannerQueue > TableScannerQueuePtr
Shared smart pointer to TableScannerQueue.
std::list< ApplicationHandler * > WorkQueue
virtual void add(ApplicationHandler *app_handler)
Adds an application handler to queue.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
std::list< ScanCellsPtr > CellsQueue
TableScannerQueue()
Default constructor.
virtual void add_unlocked(ApplicationHandler *app_handler)
Adds an application handler to queue without locking.
Hypertable definitions
virtual void run()=0
Carries out the request.
void add_cells(ScanCellsPtr &cells)
Base clase for application handlers.
Declarations for ApplicationQueueInterface.
std::condition_variable m_cond
Abstract interface for application queue.