0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Future.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "Future.h"
25 #include "TableScannerAsync.h"
26 #include "TableMutatorAsync.h"
27 
28 #include <Common/Time.h>
29 
30 #include <chrono>
31 
32 using namespace Hypertable;
33 using namespace std;
34 
35 bool Future::get(ResultPtr &result) {
36  unique_lock<mutex> lock(m_outstanding_mutex);
37  size_t mem_result=0;
38 
39  while (true) {
40 
41  // wait till we have results to serve
42  m_outstanding_cond.wait(lock, [this](){
43  return !_is_empty() || _is_done() || _is_cancelled(); });
44 
45  if (_is_cancelled())
46  return false;
47  if (_is_empty() && _is_done())
48  return false;
49  result = m_queue.front();
50  mem_result = result->memory_used();
51  m_queue.pop_front();
52  m_memory_used -= mem_result;
53  HT_ASSERT(m_memory_used >= 0);
54  // wake a thread blocked on queue space
55  m_outstanding_cond.notify_one();
56 
57  if (result->is_error())
58  break;
59  else if (result->is_scan()) {
60  TableScannerAsync *scanner = result->get_scanner();
61  // ignore result if scanner has been cancelled
62  if (!scanner || !scanner->is_cancelled()) // scanner must be alive
63  break;
64  }
65  else if (result->is_update()) {
66  TableMutatorAsync *mutator = result->get_mutator();
67  // check if alive mutator has been cancelled
68  if (!mutator || m_mutator_map.find((uint64_t)mutator) == m_mutator_map.end() ||
69  !mutator->is_cancelled())
70  break;
71  }
72  }
73  return true;
74 }
75 
76 bool Future::get(ResultPtr &result, uint32_t timeout_ms, bool &timed_out) {
77 
78  if (timeout_ms == 0)
79  return get(result);
80 
81  {
82  unique_lock<mutex> lock(m_outstanding_mutex);
83 
84  timed_out = false;
85 
86  size_t mem_result=0;
87 
88  auto wait_time = chrono::system_clock::now() + chrono::milliseconds(timeout_ms);
89 
90  while (true) {
91  // wait till we have results to serve
92  while(_is_empty() && !_is_done() && !_is_cancelled()) {
93  timed_out = m_outstanding_cond.wait_until(lock, wait_time) == std::cv_status::timeout;
94  if (timed_out)
95  return _is_done();
96  }
97  if (_is_cancelled())
98  return false;
99  if (_is_empty() && _is_done())
100  return false;
101  result = m_queue.front();
102  mem_result = result->memory_used();
103  m_queue.pop_front();
104  m_memory_used -= mem_result;
105  HT_ASSERT(m_memory_used >= 0);
106  // wake a thread blocked on queue space
107  m_outstanding_cond.notify_one();
108 
109  if (result->is_error())
110  break;
111  if (result->is_scan()) {
112  TableScannerAsync *scanner = result->get_scanner();
113  // ignore result if scanner has been cancelled
114  if (!scanner || !scanner->is_cancelled()) // scanner must be alive
115  break;
116  }
117  else if (result->is_update()) {
118  TableMutatorAsync *mutator = result->get_mutator();
119  // check if alive mutator has been cancelled
120  if (!mutator || m_mutator_map.find((uint64_t)mutator) == m_mutator_map.end() ||
121  !mutator->is_cancelled())
122  break;
123  }
124  }
125  // wake a thread blocked on queue space
126  m_outstanding_cond.notify_one();
127  return true;
128  }
129 }
130 
132  ResultPtr result = make_shared<Result>(scanner, cells);
133  enqueue(result);
134 }
135 
136 void Future::enqueue(ResultPtr &result) {
137  unique_lock<mutex> lock(m_outstanding_mutex);
138  size_t mem_result = result->memory_used();
139 
140  m_outstanding_cond.wait(lock, [this](){
141  return has_remaining_capacity() || _is_cancelled(); });
142 
143  if (!_is_cancelled()) {
144  m_queue.push_back(result);
145  m_memory_used += mem_result;
146  }
147  m_outstanding_cond.notify_one();
148 }
149 
150 void Future::scan_error(TableScannerAsync *scanner, int error, const string &error_msg,
151  bool eos) {
152  ResultPtr result = make_shared<Result>(scanner, error, error_msg);
153  enqueue(result);
154 }
155 
157  ResultPtr result = make_shared<Result>(mutator);
158  enqueue(result);
159 }
160 
161 void Future::update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures) {
162  ResultPtr result = make_shared<Result>(mutator, error, failures);
163  enqueue(result);
164 }
165 
167  lock_guard<mutex> lock(m_outstanding_mutex);
168  m_cancelled = true;
169  ScannerMap::iterator s_it = m_scanner_map.begin();
170  while (s_it != m_scanner_map.end()) {
171  s_it->second->cancel();
172  s_it++;
173  }
174  MutatorMap::iterator m_it = m_mutator_map.begin();
175  while (m_it != m_mutator_map.end()) {
176  m_it->second->cancel();
177  m_it++;
178  }
179  m_queue.clear();
180  m_memory_used = 0;
181 
182  m_outstanding_cond.notify_all();
183 }
184 
186  lock_guard<mutex> lock(m_outstanding_mutex);
187  uint64_t addr = (uint64_t) mutator;
188  MutatorMap::iterator it = m_mutator_map.find(addr);
189  if (m_cancelled)
191  "Attempt to register mutator with cancelled future %lld",
192  (Lld)reinterpret_cast<int64_t>(this));
193  HT_ASSERT(it == m_mutator_map.end());
194  m_mutator_map[addr] = mutator;
195 }
196 
198  lock_guard<mutex> lock(m_outstanding_mutex);
199  uint64_t addr = (uint64_t) mutator;
200  MutatorMap::iterator it = m_mutator_map.find(addr);
201  HT_ASSERT(it != m_mutator_map.end());
202  m_mutator_map.erase(it);
203 }
205  lock_guard<mutex> lock(m_outstanding_mutex);
206  uint64_t addr = (uint64_t) scanner;
207  ScannerMap::iterator it = m_scanner_map.find(addr);
208  if (m_cancelled)
210  "Attempt to register scanner with cancelled future %lld",
211  (Lld)reinterpret_cast<int64_t>(this));
212  HT_ASSERT(it == m_scanner_map.end());
213  m_scanner_map[addr] = scanner;
214 }
215 
217  lock_guard<mutex> lock(m_outstanding_mutex);
218  uint64_t addr = (uint64_t) scanner;
219  ScannerMap::iterator it = m_scanner_map.find(addr);
220  HT_ASSERT(it != m_scanner_map.end());
221  m_scanner_map.erase(it);
222 }
void update_ok(TableMutatorAsync *mutator)
Callback method for successful update.
Definition: Future.cc:156
void update_error(TableMutatorAsync *mutator, int error, FailedMutations &failures)
Callback method for update errors.
Definition: Future.cc:161
Asynchronous table scanner.
STL namespace.
std::shared_ptr< Result > ResultPtr
Smart pointer to Result.
Definition: Result.h:72
void deregister_mutator(TableMutatorAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
Definition: Future.cc:197
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Provides the ability to mutate a table in the form of adding and deleting rows and cells...
std::shared_ptr< ScanCells > ScanCellsPtr
Smart pointer to ScanCells.
Definition: ScanCells.h:143
void cancel()
Cancels outstanding scanners/mutators.
Definition: Future.cc:166
void scan_error(TableScannerAsync *scanner, int error, const std::string &error_msg, bool eos)
Callback method for scan errors.
Definition: Future.cc:150
Compatibility Macros for C/C++.
void deregister_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
Definition: Future.cc:216
bool get(ResultPtr &result)
This call blocks till there is a result available unless async ops have completed.
Definition: Future.cc:35
void enqueue(ResultPtr &result)
Definition: Future.cc:136
Time related declarations.
Hypertable definitions
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void register_mutator(TableMutatorAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
Definition: Future.cc:185
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
void scan_ok(TableScannerAsync *scanner, ScanCellsPtr &cells)
Callback method for successful scan.
Definition: Future.cc:131
void register_scanner(TableScannerAsync *scanner)
Hook for derived classes which want to keep track of scanners/mutators.
Definition: Future.cc:204
std::vector< FailedMutation > FailedMutations
Definition: Cells.h:39