0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
QfsBroker.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "QfsBroker.h"
25 
26 #include <Common/Filesystem.h>
27 #include <Common/System.h>
28 
29 #include <kfs/KfsClient.h>
30 
31 #include <cerrno>
32 #include <cstring>
33 #include <cstdlib>
34 
35 extern "C" {
36 #include <fcntl.h>
37 #include <poll.h>
38 #include <sys/types.h>
39 #include <sys/uio.h>
40 #include <unistd.h>
41 }
42 
43 using namespace Hypertable;
44 using namespace Hypertable::FsBroker;
45 using namespace KFS;
46 
47 std::atomic<int> QfsBroker::ms_next_fd {0};
48 
50  HT_INFOF("close(%d) file: %s", fd, fname.c_str());
51  m_client->Close(fd);
52 }
53 
55  : m_host(cfg->get_str("host")),
56  m_port(cfg->get_i16("port")),
57  m_client(KFS::Connect(m_host, m_port)) {
58  m_metrics_handler = std::make_shared<MetricsHandler>(cfg, "qfs");
59  m_metrics_handler->start_collecting();
60 }
61 
63  m_metrics_handler->stop_collecting();
64  delete m_client;
65 }
66 
67 void QfsBroker::open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz) {
68 
70  int status = m_client->VerifyDataChecksums(fname);
71  if (status < 0)
72  HT_WARNF("Checksum verification of %s failed - %s", fname,
73  KFS::ErrorCodeToStr(status).c_str());
74  }
75 
76  int qfs_fd = m_client->Open(fname, O_RDONLY);
77  if (qfs_fd < 0) {
78  HT_ERRORF("open(%s) failure (%d) - %s", fname, -qfs_fd, KFS::ErrorCodeToStr(qfs_fd).c_str());
79  report_error(cb, qfs_fd);
80  }
81  else {
82  int fd = ++ms_next_fd;
83  HT_INFOF("open(%s) -> fd=%d, qfs_fd=%d", fname, fd, qfs_fd);
84  struct sockaddr_in addr;
85  cb->get_address(addr);
86  OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, qfs_fd, m_client));
87  m_open_file_map.create(fd, addr, fdata);
88  if(bufsz)
89  m_client->SetIoBufferSize(qfs_fd, bufsz);
90  cb->response(fd);
91  }
92 
93 }
94 
95 void QfsBroker::close(ResponseCallback *cb, uint32_t fd) {
96  HT_DEBUGF("close(%d)", fd);
98  int error;
99  if ((error = cb->response_ok()) != Error::OK)
100  HT_ERRORF("Problem sending response for close(fd=%d) - %s", (int)fd, Error::get_text(error));
101 }
102 
103 void QfsBroker::create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz) {
104  int qfs_fd;
105  if(flags & Filesystem::OPEN_FLAG_OVERWRITE) {
106  qfs_fd = m_client->Open(fname, O_CREAT | O_TRUNC | O_RDWR);
107  }
108  else
109  qfs_fd = m_client->Open(fname, O_CREAT|O_WRONLY);
110 
111  if (qfs_fd < 0) {
112  HT_ERRORF("create(%s) failure (%d) - %s", fname, -qfs_fd, KFS::ErrorCodeToStr(qfs_fd).c_str());
113  report_error(cb, qfs_fd);
114  }
115  else {
116  int fd = ++ms_next_fd;
117  HT_INFOF("open(%s) -> fd=%d, qfs_fd=%d", fname, fd, qfs_fd);
118  struct sockaddr_in addr;
119  cb->get_address(addr);
120  OpenFileDataQfsPtr fdata(new OpenFileDataQfs(fname, qfs_fd, m_client));
121  m_open_file_map.create(fd, addr, fdata);
122  if(bufsz)
123  m_client->SetIoBufferSize(qfs_fd, bufsz);
124  cb->response(fd);
125  }
126 }
127 
128 void QfsBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
129 
130  OpenFileDataQfsPtr fdata;
131  if (!m_open_file_map.get(fd, fdata)) {
132  cb->error(Error::FSBROKER_BAD_FILE_HANDLE, format("%d", (int)fd));
133  m_metrics_handler->increment_error_count();
134  return;
135  }
136 
137  chunkOff_t status = m_client->Seek(fdata->fd, offset);
138  if(status == (chunkOff_t)-1) {
139  HT_ERRORF("seek(fd=%d,qfsFd=%d,%lld) failure (%d) - %s", (int)fd,
140  (int)fdata->fd, (Lld)offset, (int)-status,
141  KFS::ErrorCodeToStr(status).c_str());
142  report_error(cb, status);
143  }
144  else
145  cb->response_ok();
146 }
147 
148 void QfsBroker::read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount) {
149 
150  OpenFileDataQfsPtr fdata;
151  if (!m_open_file_map.get(fd, fdata)) {
152  cb->error(Error::FSBROKER_BAD_FILE_HANDLE, format("%d", (int)fd));
153  m_metrics_handler->increment_error_count();
154  return;
155  }
156 
157  uint64_t offset = m_client->Tell(fdata->fd);
158 
159  StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
160  int len = m_client->Read(fdata->fd, reinterpret_cast<char*>(buf.base), amount);
161  if(len<0) {
163  KFS::ErrorCodeToStr(len));
164  HT_ERRORF("read(fd=%d,qfsFd=%d,%lld) failure (%d) - %s", (int)fd,
165  (int)fdata->fd, (Lld)amount, -len,
166  KFS::ErrorCodeToStr(len).c_str());
167  report_error(cb, len);
168  }
169  else {
170  buf.size = (uint32_t)len;
171  m_metrics_handler->add_bytes_read(len);
172  cb->response(offset, buf);
173  }
174 }
175 
176 void QfsBroker::append(Response::Callback::Append *cb, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags) {
177 
178  OpenFileDataQfsPtr fdata;
179  if (!m_open_file_map.get(fd, fdata)) {
180  cb->error(Error::FSBROKER_BAD_FILE_HANDLE, format("%d", (int)fd));
181  m_metrics_handler->increment_error_count();
182  return;
183  }
184 
185  uint64_t offset = m_client->Tell(fdata->fd);
186  ssize_t written = m_client->Write(fdata->fd, reinterpret_cast<const char*>(data), amount);
187  if(written < 0) {
189  KFS::ErrorCodeToStr(written));
190  HT_ERRORF("append(fd=%d,qfsFd=%d,%lld,flags=%d) failure (%d) - %s", (int)fd,
191  (int)fdata->fd, (Lld)amount, static_cast<int>(flags),
192  (int)-written, KFS::ErrorCodeToStr(written).c_str());
193  report_error(cb, written);
194  }
195  else {
196  if (flags == Filesystem::Flags::FLUSH || flags == Filesystem::Flags::SYNC) {
197  int64_t start_time = get_ts64();
198  int error = m_client->Sync(fdata->fd);
199  if (error) {
201  KFS::ErrorCodeToStr(error));
202  HT_ERRORF("append(fd=%d,qfsFd=%d,%lld,flags=%d) failure (%d) - %s", (int)fd,
203  (int)fdata->fd, (Lld)amount, static_cast<int>(flags),
204  (int)-written, KFS::ErrorCodeToStr(written).c_str());
205  return report_error(cb, error);
206  }
207  m_metrics_handler->add_sync(get_ts64() - start_time);
208  }
209  m_metrics_handler->add_bytes_written(written);
210  cb->response(offset, written);
211  }
212 }
213 
214 void QfsBroker::remove(ResponseCallback *cb, const char *fname) {
215  int status = m_client->Remove(fname);
216  if(status == 0)
217  cb->response_ok();
218  else {
219  HT_ERRORF("remove(%s) failure (%d) - %s", fname, -status, KFS::ErrorCodeToStr(status).c_str());
220  report_error(cb, status);
221  }
222 }
223 
224 void QfsBroker::length(Response::Callback::Length *cb, const char *fname, bool accurate)
225 {
226  KfsFileAttr result;
227  int err = m_client->Stat(fname, result);
228  if(err == 0)
229  cb->response(result.fileSize);
230  else {
231  HT_ERRORF("length(%s) failure (%d) - %s", fname, -err, KFS::ErrorCodeToStr(err).c_str());
232  report_error(cb, err);
233  }
234 }
235 
236 void QfsBroker::pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset,
237  uint32_t amount, bool verify_checksum) {
238 
239  OpenFileDataQfsPtr fdata;
240  if (!m_open_file_map.get(fd, fdata)) {
241  cb->error(Error::FSBROKER_BAD_FILE_HANDLE, format("%d", (int)fd));
242  m_metrics_handler->increment_error_count();
243  return;
244  }
245 
246  if (verify_checksum) {
247  int status = m_client->VerifyDataChecksums(fdata->fd);
248  if (status < 0)
249  HT_WARNF("Checksum verification of fd=%d (qfsFd=%d) failed - %s", fd,
250  fdata->fd, KFS::ErrorCodeToStr(status).c_str());
251  }
252 
253  StaticBuffer buf((size_t)amount, (size_t)HT_DIRECT_IO_ALIGNMENT);
254  ssize_t status = m_client->PRead(fdata->fd, offset,
255  reinterpret_cast<char*>(buf.base), amount);
256  if(status < 0) {
258  KFS::ErrorCodeToStr(status));
259  HT_ERRORF("pread(fd=%d,qfsFd=%d,%lld,%lld) failure (%d) - %s", (int)fd,
260  (int)fdata->fd, (Lld)offset, (Lld)amount, (int)-status,
261  KFS::ErrorCodeToStr(status).c_str());
262  report_error(cb, status);
263  }
264  else {
265  buf.size = (uint32_t)status;
266  m_metrics_handler->add_bytes_read(buf.size);
267  cb->response(offset, buf);
268  }
269 }
270 
271 void QfsBroker::mkdirs(ResponseCallback *cb, const char *dname) {
272  int status = m_client->Mkdirs(dname);
273  if(status < 0) {
274  HT_ERRORF("mkdirs(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
275  report_error(cb, status);
276  }
277  else
278  cb->response_ok();
279 }
280 
281 void QfsBroker::rmdir(ResponseCallback *cb, const char *dname) {
282  int status = m_client->Rmdirs(dname);
283  if(status < 0 && status != -ENOENT) {
284  HT_ERRORF("rmdir(%s) failure (%d) - %s", dname, -status, KFS::ErrorCodeToStr(status).c_str());
285  report_error(cb, status);
286  }
287  else
288  cb->response_ok();
289 }
290 
291 void QfsBroker::flush(ResponseCallback *cb, uint32_t fd) {
292  this->sync(cb, fd);
293 }
294 
295 void QfsBroker::sync(ResponseCallback *cb, uint32_t fd) {
296 
297  OpenFileDataQfsPtr fdata;
298  if (!m_open_file_map.get(fd, fdata)) {
299  cb->error(Error::FSBROKER_BAD_FILE_HANDLE, format("%d", (int)fd));
300  m_metrics_handler->increment_error_count();
301  return;
302  }
303 
304  int64_t start_time = get_ts64();
305  int status = m_client->Sync(fdata->fd);
306  m_metrics_handler->add_sync(get_ts64() - start_time);
307  if(status < 0) {
309  KFS::ErrorCodeToStr(status));
310  HT_ERRORF("sync(fd=%d,qfsFd=%d) failure (%d) - %s", (int)fd,
311  (int)fdata->fd, -status,
312  KFS::ErrorCodeToStr(status).c_str());
313  report_error(cb, status);
314  }
315  else
316  cb->response_ok();
317 }
318 
319 void QfsBroker::readdir(Response::Callback::Readdir *cb, const char *dname) {
320  std::vector<KfsFileAttr> result;
321  std::vector<Filesystem::Dirent> listing;
322  int err = m_client->ReaddirPlus(dname, result);
323 
324  Filesystem::Dirent entry;
325  for (std::vector<KfsFileAttr>::iterator it = result.begin();
326  it != result.end(); ++it) {
327  if(it->filename != "." && it->filename != "..") {
328  entry.name.clear();
329  entry.name.append(it->filename);
330  entry.is_dir = it->isDirectory;
331  entry.length = (uint64_t)it->fileSize;
332  entry.last_modification_time = it->mtime.tv_sec;
333  listing.push_back(entry);
334  }
335  }
336 
337  if(err == 0)
338  cb->response(listing);
339  else {
340  HT_ERRORF("readdir(%s) failure (%d) - %s", dname, -err, KFS::ErrorCodeToStr(err).c_str());
341  report_error(cb,err);
342  }
343 }
344 
345 void QfsBroker::exists(Response::Callback::Exists *cb, const char *fname) {
346  cb->response(m_client->Exists(fname));
347 }
348 
349 void QfsBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
350  int err = m_client->Rename(src, dst);
351  if(err == 0)
352  cb->response_ok();
353  else {
354  HT_ERRORF("rename(%s,%s) failure (%d) - %s", src, dst, -err, KFS::ErrorCodeToStr(err).c_str());
355  report_error(cb, err);
356  }
357 }
358 
359 void QfsBroker::debug(ResponseCallback *cb, int32_t command, StaticBuffer &serialized_parameters) {
360  cb->error(Error::NOT_IMPLEMENTED, format("Unsupported debug command - %d",
361  command));
362 }
363 
366 }
367 
370  cb->response_ok();
371 }
372 
374  m_metrics_handler->increment_error_count();
375  string errors = KFS::ErrorCodeToStr(error);
376  switch(-error) {
377  case ENOTDIR:
378  case ENAMETOOLONG:
379  case ENOENT:
380  cb->error(Error::FSBROKER_BAD_FILENAME, errors);
381  break;
382 
383  case EACCES:
384  case EPERM:
386  break;
387 
388  case EBADF:
390  break;
391 
392  case EINVAL:
394  break;
395 
396  default:
397  cb->error(Error::FSBROKER_IO_ERROR, errors);
398  break;
399  }
400 
401 #ifndef NDEBUG
402  std::clog << "ERROR " << errors << std::endl;
403 #endif //NDEBUG
404 }
A memory buffer of static size.
Definition: StaticBuffer.h:45
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
Definition: Exists.cc:40
#define HT_WARNF(msg,...)
Definition: Logger.h:290
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
Definition: QfsBroker.cc:67
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
Definition: QfsBroker.cc:295
void report_error(ResponseCallback *cb, int error)
Definition: QfsBroker.cc:373
Application handler for append function.
Definition: Append.h:45
Abstract base class for a filesystem.
bool get(int fd, OpenFileDataPtr &fdata)
Definition: OpenFileMap.h:58
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.
Definition: QfsBroker.cc:345
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Definition: Open.cc:40
Flags
Enumeration type for append flags.
Definition: Filesystem.h:76
Application handler for exists function.
Definition: Exists.h:45
std::vector< String > errors
File system broker definitions.
Definition: CephBroker.h:38
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
Definition: QfsBroker.cc:368
virtual void debug(ResponseCallback *cb, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
Definition: QfsBroker.cc:359
time_t last_modification_time
Last modification time.
Definition: Filesystem.h:100
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
Definition: QfsBroker.cc:364
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
Definition: QfsBroker.cc:148
void remove_all(struct sockaddr_in &addr)
Definition: OpenFileMap.h:86
uint64_t length
Length of file.
Definition: Filesystem.h:98
Status & get()
Gets status information.
Definition: StatusManager.h:73
bool is_dir
Flag indicating if entry id a directory.
Definition: Filesystem.h:102
Application handler for length function.
Definition: Length.h:45
void set_write_status(Status::Code code, const std::string &text)
Sets write status.
MetricsHandlerPtr m_metrics_handler
Metrics collection handler.
Definition: QfsBroker.h:103
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
Definition: QfsBroker.cc:349
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
Definition: QfsBroker.cc:236
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Definition: main.cc:53
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
Definition: QfsBroker.cc:281
bool remove(int fd, OpenFileDataPtr &fdata)
Definition: OpenFileMap.h:68
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
Definition: QfsBroker.cc:128
void set_read_status(Status::Code code, const std::string &text)
Sets read status.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Compatibility Macros for C/C++.
virtual void append(Response::Callback::Append *, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
Definition: QfsBroker.cc:176
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
Definition: Read.cc:40
static std::atomic< int > ms_next_fd
Atomic counter for file descriptor assignment.
Definition: QfsBroker.h:100
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
Definition: QfsBroker.cc:271
OpenFileMap m_open_file_map
A map of open files.
Definition: Broker.h:237
This class is used to generate and deliver standard responses back to a client.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Definition: Readdir.cc:41
Application handler for readdir function.
Definition: Readdir.h:48
Hypertable definitions
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
Definition: QfsBroker.cc:319
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Application handler for open function.
Definition: Open.h:45
Application handler for read function.
Definition: Read.h:47
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.
Definition: QfsBroker.cc:214
#define HT_INFOF(msg,...)
Definition: Logger.h:272
String name
File or directory name.
Definition: Filesystem.h:96
Application handler for open function.
Definition: Status.h:50
int response(Hypertable::Status &status)
Sends response parameters back to client.
Definition: Status.cc:40
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
Definition: QfsBroker.cc:224
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
Definition: QfsBroker.cc:103
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
Definition: Append.cc:40
void create(int fd, struct sockaddr_in &addr, OpenFileDataPtr &fdata)
Definition: OpenFileMap.h:52
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
Definition: QfsBroker.cc:95
KFS::KfsClient *const m_client
Definition: QfsBroker.h:111
int response(uint64_t length)
Sends response parameters back to client.
Definition: Length.cc:40
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
Definition: QfsBroker.cc:291
StatusManager m_status_manager
Server status manager.
Definition: QfsBroker.h:106
int64_t get_ts64()
Returns the current time in nanoseconds as a 64bit number.
Definition: Time.cc:40
#define HT_DIRECT_IO_ALIGNMENT
Definition: Filesystem.h:49
QfsBroker(PropertiesPtr &cfg)
Definition: QfsBroker.cc:54