0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
MaintenanceQueue.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
25 
26 #ifndef Hypertable_RangeServer_MaintenanceQueue_h
27 #define Hypertable_RangeServer_MaintenanceQueue_h
28 
29 #include "MaintenanceTask.h"
31 
32 #include <AsyncComm/Clock.h>
33 
34 #include <Common/Error.h>
35 #include <Common/Logger.h>
36 #include <Common/Thread.h>
37 
38 #include <cassert>
39 #include <condition_variable>
40 #include <memory>
41 #include <mutex>
42 #include <queue>
43 #include <set>
44 
45 #define MAX_LEVELS 4
46 
47 namespace Hypertable {
48 
56 
57  static int ms_pause;
58  static std::condition_variable ms_cond;
59 
61  bool
62  operator()(const MaintenanceTask *sm1, const MaintenanceTask *sm2) const {
63  if (sm1->level != sm2->level)
64  return sm1->level > sm2->level;
65  if (sm1->priority != sm2->priority)
66  return sm1->priority > sm2->priority;
67  return sm1->start_time >= sm2->start_time;
68  }
69  };
70 
71  typedef std::priority_queue<MaintenanceTask *,
72  std::vector<MaintenanceTask *>, LtMaintenanceTask> TaskQueue;
73 
75  public:
77  memset(inflight_levels, 0, MAX_LEVELS*sizeof(int));
78  return;
79  }
80  TaskQueue queue;
82  std::condition_variable cond;
83  std::condition_variable empty_cond;
84  bool shutdown {};
85  std::set<Range *> ranges;
87  uint32_t inflight {};
88  int64_t generation {};
89  };
90 
91  class Worker {
92 
93  public:
94 
95  Worker(MaintenanceQueueState &state) : m_state(state) { return; }
96 
97  void operator()() {
98  MaintenanceTask *task = 0;
99 
100  while (true) {
101 
102  {
103  std::unique_lock<std::mutex> lock(m_state.mutex);
104  uint32_t inflight_level = lowest_inflight_level();
105 
106  auto now = std::chrono::steady_clock::now();
107 
108  // Block in the following circumstances:
109  // 1. Queue is empty
110  // 2. Level of task on front of queue is greater than (e.g. lower
111  // priority) the level of the tasks currently being executed
112  // 3. Start time of task on front of queue is in the future
113 
114  while (m_state.queue.empty() ||
115  (m_state.inflight && ((m_state.queue.top())->level > inflight_level)) ||
116  (m_state.queue.top())->start_time > now) {
117 
118  if (m_state.shutdown)
119  return;
120 
121  if (m_state.queue.empty() ||
122  (m_state.inflight && (m_state.queue.top())->level > inflight_level))
123  m_state.cond.wait(lock);
124  else
125  m_state.cond.wait_until(lock, (m_state.queue.top())->start_time);
126 
127  inflight_level = lowest_inflight_level();
128  now = std::chrono::steady_clock::now();
129  }
130 
131  task = m_state.queue.top();
132  m_state.queue.pop();
133 
134  HT_ASSERT(task->level < MAX_LEVELS);
135 
136  m_state.inflight++;
137  m_state.inflight_levels[task->level]++;
138  }
139 
140  try {
141 
142  // maybe pause
143  {
144  std::unique_lock<std::mutex> lock(m_state.mutex);
145  ms_cond.wait(lock, [](){ return !ms_pause; });
146  }
147 
148  if (m_state.shutdown)
149  return;
150 
151  task->execute();
152 
153  }
154  catch(Hypertable::Exception &e) {
156  dynamic_cast<MaintenanceTaskMemoryPurge *>(task) == 0) {
157  bool message_logged = false;
158  if (e.code() != Error::RANGESERVER_RANGE_BUSY) {
159  HT_ERROR_OUT << e << HT_END;
160  message_logged = true;
161  }
162  if (task->retry()) {
163  std::lock_guard<std::mutex> lock(m_state.mutex);
164  HT_INFOF("Maintenance Task '%s' aborted, will retry in %u "
165  "milliseconds ...", task->description().c_str(),
166  task->get_retry_delay());
167  task->start_time = std::chrono::steady_clock::now();
168  task->start_time += std::chrono::milliseconds(task->get_retry_delay());
169  m_state.queue.push(task);
171  m_state.inflight_levels[task->level]--;
172  m_state.inflight--;
173  m_state.cond.notify_one();
174  continue;
175  }
176  if (!message_logged)
177  HT_INFOF("Maintenance Task '%s' failed because range is busy, dropping task ...",
178  task->description().c_str());
179  }
180  }
181 
182  {
183  std::lock_guard<std::mutex> lock(m_state.mutex);
185  m_state.inflight_levels[task->level]--;
186  m_state.inflight--;
188  if (task->get_range())
189  m_state.ranges.erase(task->get_range());
190  if (m_state.queue.empty()) {
191  if (m_state.inflight == 0)
192  m_state.empty_cond.notify_all();
193  }
194  else if ((m_state.queue.top())->level > task->level)
195  m_state.cond.notify_all();
196  }
197 
198  delete task;
199  }
200  }
201 
202  private:
203 
211  for (uint32_t i=0; i<MAX_LEVELS; i++) {
212  if (m_state.inflight_levels[i] > 0)
213  return i;
214  }
215  return MAX_LEVELS;
216  }
217 
219  };
220 
224  bool joined;
225 
226  public:
227 
233  MaintenanceQueue(int worker_count) : m_worker_count(worker_count),
234  joined(false) {
235  Worker Worker(m_state);
236  assert (worker_count > 0);
237  for (int i=0; i<worker_count; ++i)
238  m_threads.create_thread(Worker);
239  //threads
240  }
241 
246  void shutdown() {
247  m_state.shutdown = true;
248  m_state.cond.notify_all();
249  }
250 
254  void join() {
255  if (!joined) {
256  m_threads.join_all();
257  joined = true;
258  }
259  }
260 
263  void stop() {
264  std::lock_guard<std::mutex> lock(m_state.mutex);
265  HT_INFO("Stopping maintenance queue");
266  ms_pause++;
267  }
268 
271  void start() {
272  std::lock_guard<std::mutex> lock(m_state.mutex);
273  HT_ASSERT(ms_pause > 0);
274  ms_pause--;
275  if (ms_pause == 0) {
276  ms_cond.notify_all();
277  HT_INFO("Starting maintenance queue");
278  }
279  }
280 
285  template<typename _Function>
286  void drop_range_tasks(_Function __f) {
287  std::lock_guard<std::mutex> lock(m_state.mutex);
288  TaskQueue filtered_queue;
289  MaintenanceTask *task = 0;
290  while (!m_state.queue.empty()) {
291  task = m_state.queue.top();
292  m_state.queue.pop();
293  if (task->get_range() && __f(task->get_range())) {
294  m_state.ranges.erase(task->get_range());
295  delete task;
296  m_state.generation++;
297  }
298  else
299  filtered_queue.push(task);
300  }
301  m_state.queue = filtered_queue;
302  }
303 
310  bool contains(Range *range) {
311  std::lock_guard<std::mutex> lock(m_state.mutex);
312  return m_state.ranges.count(range) > 0;
313  }
314 
319  void add(MaintenanceTask *task) {
320  std::lock_guard<std::mutex> lock(m_state.mutex);
321  m_state.queue.push(task);
322  if (task->get_range())
323  m_state.ranges.insert(task->get_range());
324  m_state.cond.notify_one();
325  }
326 
332  size_t size() {
333  std::lock_guard<std::mutex> lock(m_state.mutex);
334  return (size_t)m_state.queue.size() + (size_t)m_state.inflight;
335  }
336 
343  int64_t generation() {
344  std::lock_guard<std::mutex> lock(m_state.mutex);
345  return m_state.generation;
346  }
347 
352  bool full() {
353  std::lock_guard<std::mutex> lock(m_state.mutex);
354  return (m_state.queue.size() + (size_t)m_state.inflight) >=
355  (size_t)m_worker_count;
356  }
357 
361  bool empty() {
362  std::lock_guard<std::mutex> lock(m_state.mutex);
363  return m_state.queue.empty() && m_state.inflight == 0;
364  }
365 
368  void wait_for_empty() {
369  std::unique_lock<std::mutex> lock(m_state.mutex);
370  m_state.empty_cond.wait(lock, [this](){
371  return m_state.queue.empty() && m_state.inflight == 0; });
372  }
373 
379  std::unique_lock<std::mutex> lock(m_state.mutex);
380  return m_state.empty_cond.wait_until(lock, deadline, [this](){
381  return m_state.queue.empty() && m_state.inflight == 0; });
382  }
383 
387  int worker_count() { return m_worker_count; }
388 
389  };
390 
392  typedef std::shared_ptr<MaintenanceQueue> MaintenanceQueuePtr;
393 
396 } // namespace Hypertable
397 
398 #endif // Hypertable_RangeServer_MaintenanceQueue_h
static std::mutex mutex
Definition: Logger.cc:43
void stop()
Stops (suspends) queue processing.
static std::condition_variable ms_cond
std::priority_queue< MaintenanceTask *, std::vector< MaintenanceTask * >, LtMaintenanceTask > TaskQueue
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
void join()
Waits for a shutdown to complete.
#define MAX_LEVELS
#define HT_INFO(msg)
Definition: Logger.h:271
size_t size()
Returns the size of the queue.
MaintenanceQueueState m_state
uint32_t lowest_inflight_level()
Determine the lowest level of the tasks currently being executed.
Represents a table row range.
Definition: Range.h:69
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Declaration of ClockT.
bool wait_for_empty(ClockT::time_point deadline)
Waits for queue to become empty with deadline.
std::shared_ptr< MaintenanceQueue > MaintenanceQueuePtr
Smart pointer to MaintenanceQueue.
virtual void execute()=0
void shutdown()
Shuts down the maintenance queue.
boost::thread_group ThreadGroup
Definition: Thread.h:46
Logging routines and macros.
#define HT_END
Definition: Logger.h:220
Importing boost::thread and boost::thread_group into the Hypertable namespace.
#define HT_ERROR_OUT
Definition: Logger.h:301
void wait_for_empty()
Waits for queue to become empty.
bool contains(Range *range)
Returns true if queue contains a maintenance task for range.
int64_t generation()
Returns queue generation number.
Hypertable definitions
bool full()
Returns true if any tasks are in queue or all worker threads are busy executing tasks.
std::chrono::time_point< std::chrono::steady_clock > start_time
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Worker(MaintenanceQueueState &state)
This is a generic exception class for Hypertable.
Definition: Error.h:314
void start()
Starts queue processing.
bool empty()
Returns true if maintenance queue is empty.
void drop_range_tasks(_Function __f)
Drops range maintenance tasks from the queue.
Queue for periodic maintenance work.
int worker_count()
Returns the number of worker threads configured for the queue.
bool operator()(const MaintenanceTask *sm1, const MaintenanceTask *sm2) const
Error codes, Exception handling, error logging.
MaintenanceQueue(int worker_count)
Constructor to set up the maintenance queue.
int code() const
Returns the error code.
Definition: Error.h:391
void add(MaintenanceTask *task)
Adds a maintenance task to the queue.