0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
CephBroker.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Hypertable. If not, see <http://www.gnu.org/licenses/>
18  */
19 
20 #include <Common/Compat.h>
21 
22 #include "CephBroker.h"
23 
24 #include <Common/Filesystem.h>
25 #include <Common/FileUtils.h>
26 #include <Common/System.h>
27 
28 #include <cerrno>
29 #include <chrono>
30 #include <string>
31 #include <thread>
32 
33 extern "C" {
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <sys/uio.h>
37 #include <unistd.h>
38 }
39 
40 using namespace Hypertable;
41 using namespace Hypertable::FsBroker;
42 using namespace std;
43 
44 atomic<int> CephBroker::ms_next_fd {0};
45 
47  m_verbose = cfg->get_bool("Hypertable.Verbose");
48  m_root_dir = "";
49  //construct an arguments array
50  const char *argv[10];
51  int argc = 0;
52  argv[argc++] = "cephBroker";
53  argv[argc++] = "-m";
54  argv[argc++] = (cfg->get_str("CephBroker.MonAddr").c_str());
55  /*
56  // For Ceph debugging, uncomment these lines
57  argv[argc++] = "--debug_client";
58  argv[argc++] = "0";
59  argv[argc++] = "--debug_ms";
60  argv[argc++] = "0";
61  argv[argc++] = "--lockdep";
62  argv[argc++] = "0"; */
63 
64  HT_INFO("Calling ceph_initialize");
65  ceph_initialize(argc, argv);
66  HT_INFO("Calling ceph_mount");
67  ceph_mount();
68  HT_INFO("Returning from constructor");
69 }
70 
72  ceph_deinitialize();
73 }
74 
75 void CephBroker::open(Response::Callback::Open *cb, const char *fname,
76  uint32_t flags, uint32_t bufsz) {
77  int fd, ceph_fd;
78  String abspath;
79  HT_DEBUGF("open file='%s' bufsz=%d", fname, bufsz);
80 
81  make_abs_path(fname, abspath);
82 
83  fd = ++ms_next_fd;
84 
85  if ((ceph_fd = ceph_open(abspath.c_str(), O_RDONLY)) < 0) {
86  report_error(cb, -ceph_fd);
87  return;
88  }
89  HT_INFOF("open (%s) fd=%d ceph_fd=%d", fname, fd, ceph_fd);
90 
91  {
92  struct sockaddr_in addr;
93  OpenFileDataCephPtr fdata (new OpenFileDataCeph(abspath, ceph_fd, O_RDONLY));
94 
95  cb->get_address(addr);
96 
97  m_open_file_map.create(fd, addr, fdata);
98 
99  cb->response(fd);
100  }
101 }
102 
103 void CephBroker::create(Response::Callback::Open *cb, const char *fname, uint32_t flags,
104  int32_t bufsz, int16_t replication, int64_t blksz){
105  int fd, ceph_fd;
106  int oflags;
107  String abspath;
108 
109  make_abs_path(fname, abspath);
110  HT_DEBUGF("create file='%s' flags=%u bufsz=%d replication=%d blksz=%lld",
111  fname, flags, bufsz, (int)replication, (Lld)blksz);
112 
113  fd = ++ms_next_fd;
114 
116  oflags = O_WRONLY | O_CREAT | O_TRUNC;
117  else
118  oflags = O_WRONLY | O_CREAT | O_APPEND;
119 
120  //make sure the directories in the path exist
121  String directory = abspath.substr(0, abspath.rfind('/'));
122  int r;
123  HT_INFOF("Calling mkdirs on %s", directory.c_str());
124  if((r=ceph_mkdirs(directory.c_str(), 0644)) < 0 && r!=-EEXIST) {
125  HT_ERRORF("create failed on mkdirs: dname='%s' - %d", directory.c_str(), -r);
126  report_error(cb, -r);
127  return;
128  }
129 
130  //create file
131  if ((ceph_fd = ceph_open(abspath.c_str(), oflags, 0644)) < 0) {
132  HT_ERRORF("open failed: file=%s - %s", abspath.c_str(), strerror(-ceph_fd));
133  report_error(cb, ceph_fd);
134  return;
135  }
136 
137  HT_INFOF("create %s = %d", fname, ceph_fd);
138 
139  {
140  struct sockaddr_in addr;
141  OpenFileDataCephPtr fdata (new OpenFileDataCeph(fname, ceph_fd, O_WRONLY));
142 
143  cb->get_address(addr);
144 
145  m_open_file_map.create(fd, addr, fdata);
146 
147  cb->response(fd);
148  }
149 }
150 
151 void CephBroker::close(ResponseCallback *cb, uint32_t fd) {
152  if (m_verbose) {
153  HT_INFOF("close fd=%d", fd);
154  }
155  OpenFileDataCephPtr fdata;
156  m_open_file_map.get(fd, fdata);
157  m_open_file_map.remove(fd);
158  cb->response_ok();
159 }
160 
161 void CephBroker::read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount) {
162  OpenFileDataCephPtr fdata;
163  ssize_t nread;
164  uint64_t offset;
165  StaticBuffer buf(new uint8_t [amount], amount);
166 
167  HT_DEBUGF("read fd=%d amount = %d", fd, amount);
168 
169  if (!m_open_file_map.get(fd, fdata)) {
170  char errbuf[32];
171  sprintf(errbuf, "%d", fd);
173  HT_ERRORF("bad file handle: %d", fd);
174  return;
175  }
176 
177  if ((offset = ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
178  HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd, strerror(-offset));
179  report_error(cb, offset);
180  return;
181  }
182 
183  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount)) < 0 ) {
184  HT_ERRORF("read failed: fd=%d ceph_fd=%d amount=%d", fd, fdata->fd, amount);
185  report_error(cb, -nread);
186  return;
187  }
188 
189  buf.size = nread;
190  cb->response(offset, buf);
191 }
192 
194  uint32_t amount, const void *data, Filesystem::Flags flags)
195 {
196  OpenFileDataCephPtr fdata;
197  ssize_t nwritten;
198  uint64_t offset;
199 
200  HT_DEBUG_OUT << "append fd="<< fd <<" amount="<< amount <<" data='"
201  << format_bytes(20, data, amount) << " flags="
202  << static_cast<uint8_t>(flags) << HT_END;
203 
204  if (!m_open_file_map.get(fd, fdata)) {
205  char errbuf[32];
206  sprintf(errbuf, "%d", fd);
208  return;
209  }
210 
211  if ((offset = (uint64_t)ceph_lseek(fdata->fd, 0, SEEK_CUR)) < 0) {
212  HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=0 SEEK_CUR - %s", fd, fdata->fd,
213  strerror(-offset));
214  report_error(cb, offset);
215  return;
216  }
217 
218  if ((nwritten = ceph_write(fdata->fd, (const char *)data, amount)) < 0) {
219  HT_ERRORF("write failed: fd=%d ceph_fd=%d amount=%d - %s", fd, fdata->fd, amount,
220  strerror(-nwritten));
221  report_error(cb, -nwritten);
222  return;
223  }
224 
225  int r;
226  if ((flags == Filesystem::Flags::FLUSH || flags == Filesystem::Flags::SYNC) &&
227  ((r = ceph_fsync(fdata->fd, true)) != 0)) {
228  HT_ERRORF("flush failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(errno));
229  report_error(cb, r);
230  return;
231  }
232 
233  cb->response(offset, nwritten);
234 }
235 
236 void CephBroker::seek(ResponseCallback *cb, uint32_t fd, uint64_t offset) {
237  OpenFileDataCephPtr fdata;
238 
239  HT_DEBUGF("seek fd=%lu offset=%llu", (Lu)fd, (Llu)offset);
240 
241  if (!m_open_file_map.get(fd, fdata)) {
242  char errbuf[32];
243  sprintf(errbuf, "%d", fd);
245  return;
246  }
247  int r;
248  if ((r = (uint64_t)ceph_lseek(fdata->fd, offset, SEEK_SET)) < 0) {
249  HT_ERRORF("lseek failed: fd=%d ceph_fd=%d offset=%llu - %s", fd, fdata->fd,
250  (Llu)offset, strerror(-r));
251  report_error(cb, offset);
252  return;
253  }
254 
255  cb->response_ok();
256 }
257 
258 void CephBroker::remove(ResponseCallback *cb, const char *fname) {
259  String abspath;
260 
261  HT_DEBUGF("remove file='%s'", fname);
262 
263  make_abs_path(fname, abspath);
264 
265  int r;
266  if ((r = ceph_unlink(abspath.c_str())) < 0) {
267  HT_ERRORF("unlink failed: file='%s' - %s", abspath.c_str(), strerror(-r));
268  report_error(cb, r);
269  return;
270  }
271  cb->response_ok();
272 }
273 
274 void CephBroker::length(Response::Callback::Length *cb, const char *fname,
275  bool accurate) {
276  int r;
277  struct stat statbuf;
278 
279  (void)accurate;
280 
281  HT_DEBUGF("length file='%s'", fname);
282 
283  if ((r = ceph_lstat(fname, &statbuf)) < 0) {
284  String abspath;
285  make_abs_path(fname, abspath);
286  HT_ERRORF("length (stat) failed: file='%s' - %s", abspath.c_str(), strerror(-r));
287  report_error(cb,- r);
288  return;
289  }
290  cb->response(statbuf.st_size);
291 }
292 
293 void CephBroker::pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset,
294  uint32_t amount, bool) {
295  OpenFileDataCephPtr fdata;
296  ssize_t nread;
297  StaticBuffer buf(new uint8_t [amount], amount);
298 
299  HT_DEBUGF("pread fd=%d offset=%llu amount=%d", fd, (Llu)offset, amount);
300 
301  if (!m_open_file_map.get(fd, fdata)) {
302  char errbuf[32];
303  sprintf(errbuf, "%d", fd);
305  return;
306  }
307 
308  if ((nread = ceph_read(fdata->fd, (char *)buf.base, amount, offset)) < 0) {
309  HT_ERRORF("pread failed: fd=%d ceph_fd=%d amount=%d offset=%llu - %s", fd, fdata->fd,
310  amount, (Llu)offset, strerror(-nread));
311  report_error(cb, nread);
312  return;
313  }
314 
315  buf.size = nread;
316 
317  cb->response(offset, buf);
318 }
319 
320 void CephBroker::mkdirs(ResponseCallback *cb, const char *dname) {
321  String absdir;
322 
323  HT_DEBUGF("mkdirs dir='%s'", dname);
324 
325  make_abs_path(dname, absdir);
326  int r;
327  if((r=ceph_mkdirs(absdir.c_str(), 0644)) < 0 && r!=-EEXIST) {
328  HT_ERRORF("mkdirs failed: dname='%s' - %d", absdir.c_str(), -r);
329  report_error(cb, -r);
330  return;
331  }
332  cb->response_ok();
333 }
334 
335 void CephBroker::rmdir(ResponseCallback *cb, const char *dname) {
336  String absdir;
337  int r;
338 
339  make_abs_path(dname, absdir);
340  if((r = rmdir_recursive(absdir.c_str())) < 0) {
341  HT_ERRORF("failed to remove dir %s, got error %d", absdir.c_str(), r);
342  report_error(cb, -r);
343  return;
344  }
345  cb->response_ok();
346 }
347 
349  DIR *dirp;
350  struct dirent de;
351  struct stat st;
352  int r;
353  if ((r = ceph_opendir(directory, &dirp) < 0))
354  return r; //failed to open
355  while ((r = ceph_readdirplus_r(dirp, &de, &st, 0)) > 0) {
356  String new_dir = de.d_name;
357  if(!(new_dir.compare(".")==0 || new_dir.compare("..")==0)) {
358  new_dir = directory;
359  new_dir += '/';
360  new_dir += de.d_name;
361  if (S_ISDIR(st.st_mode)) { //it's a dir, clear it out...
362  if((r=rmdir_recursive(new_dir.c_str())) < 0) return r;
363  } else { //delete this file
364  if((r=ceph_unlink(new_dir.c_str())) < 0) return r;
365  }
366  }
367  }
368  if (r < 0) return r; //we got an error
369  if ((r = ceph_closedir(dirp)) < 0) return r;
370  return ceph_rmdir(directory);
371 }
372 
373 void CephBroker::flush(ResponseCallback *cb, uint32_t fd) {
374  this->sync(cb, fd);
375 }
376 
377 void CephBroker::sync(ResponseCallback *cb, uint32_t fd) {
378  OpenFileDataCephPtr fdata;
379 
380  HT_DEBUGF("sync fd=%d", fd);
381 
382  if (!m_open_file_map.get(fd, fdata)) {
383  char errbuf[32];
384  sprintf(errbuf, "%d", fd);
386  return;
387  }
388 
389  int r;
390  if ((r = ceph_fsync(fdata->fd, true)) != 0) {
391  HT_ERRORF("sync failed: fd=%d ceph_fd=%d - %s", fd, fdata->fd, strerror(-r));
392  report_error(cb, -r);
393  return;
394  }
395 
396  cb->response_ok();
397 }
398 
400  cb->response(m_status);
401 }
402 
404  m_open_file_map.remove_all();
405  cb->response_ok();
406  this_thread::sleep_for(chrono::milliseconds(2000));
407 }
408 
409 void CephBroker::readdir(Response::Callback::Readdir *cb, const char *dname) {
410  std::vector<Filesystem::Dirent> listing;
411  String absdir;
412 
413  HT_DEBUGF("Readdir dir='%s'", dname);
414 
415  //get from ceph in a buffer
416  make_abs_path(dname, absdir);
417 
418  DIR *dirp;
419  ceph_opendir(absdir.c_str(), &dirp);
420  int r;
421  int buflen = 100; //good default?
422  char *buf = new char[buflen];
423  int bufpos;
424  while (1) {
425  r = ceph_getdnames(dirp, buf, buflen);
426  if (r==-ERANGE) { //expand the buffer
427  delete buf;
428  buflen *= 2;
429  buf = new char[buflen];
430  continue;
431  }
432  if (r<=0) break;
433 
434  //if we make it here, we got at least one name, maybe more
435  bufpos = 0;
436  Filesystem::Dirent entry;
437  while (bufpos<r) {//make new strings and add them to listing
438  entry.name = String(buf+bufpos);
439  if (entry.name.compare(".") && entry.name.compare(".."))
440  listing.push_back(entry);
441  bufpos+=entry.name.size()+1;
442  }
443  }
444  delete buf;
445  ceph_closedir(dirp);
446 
447  if (r < 0) report_error(cb, -r); //Ceph shouldn't return r<0 on getdnames
448  //(except for ERANGE) so if it happens this is bad
449  cb->response(listing);
450 }
451 
452 
453 void CephBroker::exists(Response::Callback::Exists *cb, const char *fname) {
454  String abspath;
455  struct stat statbuf;
456 
457  HT_DEBUGF("exists file='%s'", fname);
458  make_abs_path(fname, abspath);
459  cb->response(ceph_lstat(abspath.c_str(), &statbuf) == 0);
460 }
461 
462 void CephBroker::rename(ResponseCallback *cb, const char *src, const char *dst) {
463  String src_abs;
464  String dest_abs;
465  int r;
466 
467  make_abs_path(src, src_abs);
468  make_abs_path(dst, dest_abs);
469  if ((r = ceph_rename(src_abs.c_str(), dest_abs.c_str())) <0 ) {
470  report_error(cb, r);
471  return;
472  }
473  cb->response_ok();
474 }
475 
476 void CephBroker::debug(ResponseCallback *cb, int32_t command,
477  StaticBuffer &serialized_parameters) {
478  HT_ERROR("debug commands not implemented!");
479  cb->error(Error::NOT_IMPLEMENTED, format("Debug commands not supported"));
480 }
481 
483  char errbuf[128];
484  errbuf[0] = 0;
485 
486  strerror_r(error, errbuf, 128);
487 
488  cb->error(Error::FSBROKER_IO_ERROR, errbuf);
489 }
490 
491 
A memory buffer of static size.
Definition: StaticBuffer.h:45
Retrieves system information (hardware, installation directory, etc)
int response(bool exists)
Sends response parameters back to client.
Definition: Exists.cc:40
virtual void create(Response::Callback::Open *cb, const char *fname, uint32_t flags, int32_t bufsz, int16_t replication, int64_t blksz)
Open a file, and create it if it doesn't exist, optionally overwriting the contents.
Definition: CephBroker.cc:103
void get_address(struct sockaddr_in &addr)
Gets the remote address of the requesting client.
Application handler for append function.
Definition: Append.h:45
Abstract base class for a filesystem.
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
int response(int32_t fd)
Sends response parameters back to client.
Definition: Open.cc:40
Flags
Enumeration type for append flags.
Definition: Filesystem.h:76
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Application handler for exists function.
Definition: Exists.h:45
File system broker definitions.
Definition: CephBroker.h:38
static atomic< int > ms_next_fd
Definition: CephBroker.h:91
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
virtual void length(Response::Callback::Length *cb, const char *fname, bool accurate=true)
Get length of file.
Definition: CephBroker.cc:274
virtual void flush(ResponseCallback *cb, uint32_t fd)
Flush data that has been written.
Definition: CephBroker.cc:373
int rmdir_recursive(const char *directory)
Definition: CephBroker.cc:348
String format_bytes(size_t n, const void *buf, size_t len, const char *trailer)
Return first n bytes of buffer with an optional trailer if the size of the buffer exceeds n...
Definition: String.cc:103
virtual void report_error(ResponseCallback *cb, int error)
Definition: CephBroker.cc:482
virtual void sync(ResponseCallback *cb, uint32_t fd)
Sync out data that has been written.
Definition: CephBroker.cc:377
Application handler for length function.
Definition: Length.h:45
Directory container class.
Definition: directory.h:343
File system utility functions.
virtual void pread(Response::Callback::Read *cb, uint32_t fd, uint64_t offset, uint32_t amount, bool verify_checksum)
Read from file at position.
Definition: CephBroker.cc:293
virtual void readdir(Response::Callback::Readdir *cb, const char *dname)
Read a directory's contents.
Definition: CephBroker.cc:409
virtual void append(Response::Callback::Append *cb, uint32_t fd, uint32_t amount, const void *data, Filesystem::Flags flags)
Append data to open file.
Definition: CephBroker.cc:193
virtual void open(Response::Callback::Open *cb, const char *fname, uint32_t flags, uint32_t bufsz)
Open a file and pass the fd to the callback on success.
Definition: CephBroker.cc:75
virtual void status(Response::Callback::Status *cb)
Check status of FSBroker.
Definition: CephBroker.cc:399
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Compatibility Macros for C/C++.
#define HT_END
Definition: Logger.h:220
int response(uint64_t offset, StaticBuffer &buffer)
Sends response parameters back to client.
Definition: Read.cc:40
virtual void debug(ResponseCallback *, int32_t command, StaticBuffer &serialized_parameters)
Debug command.
Definition: CephBroker.cc:476
CephBroker(PropertiesPtr &cfg)
Definition: CephBroker.cc:46
virtual void exists(Response::Callback::Exists *cb, const char *fname)
Check for the existence of a file.
Definition: CephBroker.cc:453
This class is used to generate and deliver standard responses back to a client.
int response(std::vector< Filesystem::Dirent > &listing)
Sends response parameters back to client.
Definition: Readdir.cc:41
Application handler for readdir function.
Definition: Readdir.h:48
virtual void mkdirs(ResponseCallback *cb, const char *dname)
Make a directory hierarcy, If the parent dirs are not, present, they are also created.
Definition: CephBroker.cc:320
Hypertable definitions
#define HT_DEBUGF(msg,...)
Definition: Logger.h:260
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
Application handler for open function.
Definition: Open.h:45
virtual void seek(ResponseCallback *cb, uint32_t fd, uint64_t offset)
Seek open file.
Definition: CephBroker.cc:236
#define HT_ERROR(msg)
Definition: Logger.h:299
virtual void rename(ResponseCallback *cb, const char *src, const char *dst)
Rename a file from src to dst.
Definition: CephBroker.cc:462
Application handler for read function.
Definition: Read.h:47
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
virtual void read(Response::Callback::Read *cb, uint32_t fd, uint32_t amount)
Read data from an open file.
Definition: CephBroker.cc:161
#define HT_INFOF(msg,...)
Definition: Logger.h:272
String name
File or directory name.
Definition: Filesystem.h:96
virtual void rmdir(ResponseCallback *cb, const char *dname)
Remove a directory.
Definition: CephBroker.cc:335
virtual void close(ResponseCallback *cb, uint32_t fd)
Close open file.
Definition: CephBroker.cc:151
Application handler for open function.
Definition: Status.h:50
int response(Hypertable::Status &status)
Sends response parameters back to client.
Definition: Status.cc:40
long unsigned int Lu
Shortcut for printf formats.
Definition: String.h:47
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
int response(uint64_t offset, uint32_t amount)
Sends response parameters back to client.
Definition: Append.cc:40
virtual void shutdown(ResponseCallback *cb)
Gracefully shutdown broker, closeing open files.
Definition: CephBroker.cc:403
int response(uint64_t length)
Sends response parameters back to client.
Definition: Length.cc:40
#define HT_DEBUG_OUT
Definition: Logger.h:261
virtual void remove(ResponseCallback *cb, const char *fname)
Remove a file or directory.
Definition: CephBroker.cc:258