0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
RecoveryStepFuture.h
Go to the documentation of this file.
1 /* -*- c++ -*-
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hypertable_Master_RecoveryStepFuture_h
23 #define Hypertable_Master_RecoveryStepFuture_h
24 
25 #include <Common/Time.h>
26 #include <Common/Timer.h>
27 
28 #include <boost/thread/condition.hpp>
29 
30 #include <condition_variable>
31 #include <map>
32 #include <memory>
33 #include <mutex>
34 #include <utility>
35 #include <vector>
36 
37 namespace Hypertable {
38 
43  public:
44 
45  typedef std::map<String, std::pair<int32_t, String> > ErrorMapT;
46 
47  RecoveryStepFuture(const String &label, int plan_generation) :
48  m_label(label), m_plan_generation(plan_generation),
49  m_extend_timeout(false) { }
50 
51  void register_locations(StringSet &locations) {
52  std::lock_guard<std::mutex> lock(m_mutex);
53  for (auto &location : m_success)
54  locations.erase(location);
55  m_outstanding.clear();
56  m_outstanding.insert(locations.begin(), locations.end());
57  m_error_map.clear();
58  }
59 
60  void status(const String &location, int plan_generation) {
61  std::lock_guard<std::mutex> lock(m_mutex);
62  m_extend_timeout = true;
63  m_cond.notify_all();
64  }
65 
66  void success(const String &location, int plan_generation) {
67  std::lock_guard<std::mutex> lock(m_mutex);
68 
69  if (m_outstanding.empty()) {
70  m_cond.notify_all();
71  return;
72  }
73 
74  m_success.insert(location);
75 
76  m_outstanding.erase(location);
77 
78  // purge from error map
79  ErrorMapT::iterator iter = m_error_map.find(location);
80  if (iter != m_error_map.end())
81  m_error_map.erase(iter);
82 
83  if (m_outstanding.empty())
84  m_cond.notify_all();
85  }
86 
87  void failure(const String &location, int plan_generation,
88  int32_t error, const String &message) {
89  std::lock_guard<std::mutex> lock(m_mutex);
90 
91  if (plan_generation != m_plan_generation) {
92  HT_INFOF("Ignoring response from %s for recovery step %s because "
93  "response plan generation %d does not match registered (%d)",
94  location.c_str(), m_label.c_str(), plan_generation,
96  return;
97  }
98 
99  m_outstanding.erase(location);
100 
101  if (m_success.count(location) == 0)
102  m_error_map[location] = make_pair(error, message);
103 
104  if (m_outstanding.empty())
105  m_cond.notify_all();
106 
107  }
108 
109  bool wait_for_completion(uint32_t initial_timeout) {
110  std::unique_lock<std::mutex> lock(m_mutex);
111  ErrorMapT::iterator iter;
112 
113  auto expire_time = std::chrono::system_clock::now() +
114  std::chrono::milliseconds(initial_timeout);
115 
116  while (!m_outstanding.empty()) {
117 
118  if (m_cond.wait_until(lock, expire_time) == std::cv_status::timeout) {
119  if (!m_outstanding.empty()) {
120  for (auto &location : m_outstanding) {
121  iter = m_error_map.find(location);
122  if (iter == m_error_map.end())
123  m_error_map[location] = make_pair(Error::REQUEST_TIMEOUT, "");
124  }
125  m_outstanding.clear();
126  }
127  }
128  if (m_extend_timeout) {
129  expire_time = std::chrono::system_clock::now() +
130  std::chrono::milliseconds(initial_timeout);
131  m_extend_timeout = false;
132  }
133  }
134  return m_error_map.empty();
135  }
136 
137  void get_error_map(ErrorMapT &error_map) {
138  std::lock_guard<std::mutex> lock(m_mutex);
139  error_map = m_error_map;
140  }
141 
142  protected:
144  std::condition_variable m_cond;
148  ErrorMapT m_error_map;
151  };
152 
153  typedef std::shared_ptr<RecoveryStepFuture> RecoveryStepFuturePtr;
154 }
155 
156 #endif // Hypertable_Master_RecoveryStepFuture_h
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
static std::mutex mutex
Definition: Logger.cc:43
RecoveryStepFuture(const String &label, int plan_generation)
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
void register_locations(StringSet &locations)
std::map< String, std::pair< int32_t, String > > ErrorMapT
void status(const String &location, int plan_generation)
std::shared_ptr< RecoveryStepFuture > RecoveryStepFuturePtr
A timer class to keep timeout states across AsyncComm related calls.
Tracks outstanding RangeServer recover requests.
Time related declarations.
Hypertable definitions
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void success(const String &location, int plan_generation)
bool wait_for_completion(uint32_t initial_timeout)
std::condition_variable m_cond
void get_error_map(ErrorMapT &error_map)
void failure(const String &location, int plan_generation, int32_t error, const String &message)