0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ClientBufferedReaderHandler.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 #include "Client.h"
26 
27 #include <AsyncComm/Protocol.h>
28 
29 #include <Common/Error.h>
30 
31 using namespace Hypertable;
32 using namespace Hypertable::FsBroker::Lib;
33 using namespace std;
34 
36  Client *client, uint32_t fd, uint32_t buf_size,
37  uint32_t outstanding, uint64_t start_offset, uint64_t end_offset) :
38  m_client(client), m_fd(fd), m_read_size(buf_size), m_eof(false),
39  m_error(Error::OK) {
40 
41  m_max_outstanding = outstanding;
42  m_end_offset = end_offset;
43  m_outstanding_offset = start_offset;
44  m_actual_offset = start_offset;
45 
49  if (start_offset > 0) {
50  try { m_client->seek(m_fd, start_offset); }
51  catch (...) {
52  m_eof = true;
53  throw;
54  }
55  }
56 
57  {
58  lock_guard<mutex> lock(m_mutex);
59  uint32_t toread;
60 
63  if ((toread = (uint32_t)(m_end_offset - m_outstanding_offset)) == 0)
64  break;
65  }
66  else
67  toread = m_read_size;
68 
69  try { m_client->read(m_fd, toread, this); }
70  catch (...) {
71  m_eof = true;
72  throw;
73  }
74  m_outstanding_offset += toread;
75  }
76  m_ptr = m_end_ptr = 0;
77  }
78 }
79 
80 
81 
83  try {
84  unique_lock<mutex> lock(m_mutex);
85  m_eof = true;
86 
87  m_cond.wait(lock, [this](){ return m_outstanding == 0; });
88  }
89  catch (...) {
90  HT_ERROR("synchronization error");
91  }
92 }
93 
94 
95 
100  lock_guard<mutex> lock(m_mutex);
101 
102  m_outstanding--;
103 
104  if (event->type == Event::MESSAGE) {
105  if ((m_error = (int)Protocol::response_code(event)) != Error::OK) {
107  HT_ERRORF("FS read error (amount=%u, fd=%d) : %s",
108  m_read_size, m_fd, m_error_msg.c_str());
109  m_eof = true;
110  m_cond.notify_all();
111  return;
112  }
113  m_queue.push(event);
114 
115  {
116  uint32_t amount;
117  uint64_t offset;
118  const void *data;
119  m_client->decode_response_read(event, &data, &offset, &amount);
120  m_actual_offset += amount;
121  if (amount < m_read_size)
122  m_eof = true;
123  }
124  }
125  else if (event->type == Event::ERROR) {
126  m_error_msg = event->to_str();
127  HT_ERRORF("%s", m_error_msg.c_str());
128  m_error = event->error;
129  m_eof = true;
130  }
131  else {
132  m_error_msg = event->to_str();
133  HT_ERRORF("%s", m_error_msg.c_str());
135  m_eof = true;
136  }
137 
138  m_cond.notify_all();
139 }
140 
141 
142 
143 size_t
144 ClientBufferedReaderHandler::read(void *buf, size_t len) {
145  unique_lock<mutex> lock(m_mutex);
146  uint8_t *ptr = (uint8_t *)buf;
147  long nleft = len;
148  long available, nread;
149 
150  while (true) {
151 
152  m_cond.wait(lock, [this](){ return !m_queue.empty() || m_eof; });
153 
154  if (m_error != Error::OK)
156 
157  if (m_queue.empty())
158  HT_THROW(Error::FSBROKER_EOF, "short read (empty queue)");
159 
160  if (m_ptr == 0) {
161  uint64_t offset;
162  uint32_t amount;
163  EventPtr &event = m_queue.front();
164  m_client->decode_response_read(event, (const void **)&m_ptr, &offset, &amount);
165  m_end_ptr = m_ptr + amount;
166  }
167 
168  available = m_end_ptr - m_ptr;
169 
170  if (available >= nleft) {
171  memcpy(ptr, m_ptr, nleft);
172  nread = len;
173  m_ptr += nleft;
174  if ((m_end_ptr - m_ptr) == 0) {
175  m_queue.pop();
176  m_ptr = 0;
177  read_ahead();
178  }
179  break;
180  }
181  else if (available == 0) {
182  if (m_eof && m_queue.size() == 1) {
183  m_queue.pop();
184  m_ptr = m_end_ptr = 0;
185  nread = len - nleft;
186  break;
187  }
188  }
189 
190  memcpy(ptr, m_ptr, available);
191  ptr += available;
192  nleft -= available;
193  m_queue.pop();
194  m_ptr = 0;
195  read_ahead();
196  }
197 
198  return nread;
199 }
200 
201 
202 
207  uint32_t n = m_max_outstanding - (m_outstanding + m_queue.size());
208  uint32_t toread;
209 
211 
212  if (m_eof)
213  return;
214 
215  for (uint32_t i=0; i<n; i++) {
217  if ((toread = (uint32_t)(m_end_offset - m_outstanding_offset)) == 0)
218  break;
219  }
220  else
221  toread = m_read_size;
222 
223  try { m_client->read(m_fd, toread, this); }
224  catch(...) {
225  m_eof = true;
226  throw;
227  }
228  m_outstanding++;
229  m_outstanding_offset += toread;
230  }
231 }
232 
233 
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
Error event
Definition: Event.h:64
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Compatibility Macros for C/C++.
ClientBufferedReaderHandler(Client *client, uint32_t fd, uint32_t buf_size, uint32_t outstanding, uint64_t start_offset, uint64_t end_offset)
void read(int32_t fd, size_t amount, DispatchHandler *handler) override
Definition: Client.cc:321
Hypertable definitions
#define HT_ERROR(msg)
Definition: Logger.h:299
Declarations for Protocol.
Request/response message event.
Definition: Event.h:63
virtual void handle(EventPtr &event)
Callback method.
Proxy class for FS broker.
Definition: Client.h:58
void seek(int32_t fd, uint64_t offset, DispatchHandler *handler) override
Definition: Client.cc:488
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
File system broker framework and client library.
Definition: Broker.h:44
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
Declarations for Client.
void decode_response_read(EventPtr &event, const void **buffer, uint64_t *offset, uint32_t *length) override
Decodes the response from a read request.
Definition: Client.cc:378