0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Master.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "Config.h"
25 #include "Event.h"
26 #include "Notification.h"
27 #include "Master.h"
28 #include "Session.h"
29 #include "SessionData.h"
30 
31 #include <Common/Thread.h>
32 #include <Common/Error.h>
33 #include <Common/Path.h>
34 #include <Common/FileUtils.h>
35 #include <Common/StringExt.h>
36 #include <Common/Random.h>
37 #include <Common/SystemInfo.h>
38 
39 #include <boost/algorithm/string.hpp>
40 #include <boost/tokenizer.hpp>
41 
42 #include <algorithm>
43 #include <cstring>
44 #include <sstream>
45 
46 extern "C" {
47 #include <dirent.h>
48 #include <fcntl.h>
49 #include <sys/file.h>
50 #include <sys/time.h>
51 #include <sys/types.h>
52 #include <sys/stat.h>
53 #if defined(__FreeBSD__)
54 #include <sys/extattr.h>
55 #else
56 #include <sys/xattr.h>
57 #endif
58 #include <unistd.h>
59 }
60 
61 using namespace Hypertable;
62 using namespace Hypertable::Config;
63 using namespace Hyperspace;
64 using namespace std;
65 
66 #define HT_BDBTXN_BEGIN(parent_txn) \
67  do { \
68  BDbTxn txn;\
69  std::stringstream txn_str;\
70  HT_ASSERT(is_master());\
71  m_bdb_fs->start_transaction(txn); \
72  try
73 
74 #define HT_BDBTXN_END_CB(_cb_) \
75  catch (Exception &e) { \
76  if (e.code() == Error::HYPERSPACE_BERKELEYDB_DEADLOCK) { \
77  txn_str << txn; \
78  HT_INFOF("Berkeley DB deadlock encountered in txn %s", txn_str.str().c_str()); \
79  txn.abort(); \
80  this_thread::sleep_for(Random::duration_millis(3000)); \
81  continue; \
82  }\
83  else if (e.code() == Error::HYPERSPACE_BERKELEYDB_REP_HANDLE_DEAD) { \
84  txn_str << txn; \
85  HT_INFOF("Berkeley DB rep handle dead deadlock encountered in txn %s", txn_str.str().c_str()); \
86  txn.abort(); \
87  continue; \
88  }\
89  else if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
90  HT_ERROR_OUT << e << HT_END; \
91  else \
92  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what()); \
93  txn.abort(); \
94  _cb_->error(e.code(), e.what()); \
95  return; \
96  } \
97  HT_DEBUG_OUT << "end txn " << txn << HT_END; \
98  break; \
99  } while (true)
100 
101 #define HT_BDBTXN_END(...) \
102  catch (Exception &e) { \
103  if (e.code() == Error::HYPERSPACE_BERKELEYDB_DEADLOCK) {\
104  txn_str << txn; \
105  HT_INFOF("Berkeley DB deadlock encountered in txn %s", txn_str.str().c_str()); \
106  txn.abort(); \
107  this_thread::sleep_for(Random::duration_millis(3000)); \
108  continue; \
109  }\
110  else if (e.code() == Error::HYPERSPACE_BERKELEYDB_REP_HANDLE_DEAD) { \
111  txn_str << txn; \
112  HT_INFOF("Berkeley DB rep handle dead deadlock encountered in txn %s", txn_str.str().c_str()); \
113  txn.abort(); \
114  continue; \
115  }\
116  else if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
117  HT_ERROR_OUT << e << HT_END; \
118  else \
119  HT_ERRORF("%s - %s", Error::get_text(e.code()), e.what()); \
120  txn.abort(); \
121  return __VA_ARGS__; \
122  } \
123  HT_DEBUG_OUT << "end txn " << txn << HT_END; \
124  break; \
125  } while (true)
126 
127 /*
128  * Sets up the m_base_dir variable to be the absolute path of the root of the
129  * Hyperspace directory (with no trailing slash); Locks this root directory to
130  * prevent concurrent masters and then reads/increments/writes the 32-bit
131  * integer attribute 'generation'; Creates the server Keepalive handler.
132  */
134  ServerKeepaliveHandlerPtr &keepalive_handler,
135  ApplicationQueuePtr &app_queue_ptr)
136  : m_verbose(false), m_next_handle_number(1), m_next_session_id(1),
137  m_maintenance_outstanding(false),
138  m_shutdown(false), m_bdb_fs(0) {
139 
140  m_verbose = props->get_bool("verbose");
141  m_lease_interval = props->get_i32("Hyperspace.Lease.Interval");
142  m_keep_alive_interval = props->get_i32("Hyperspace.KeepAlive.Interval");
143  m_maintenance_interval = props->get_i32("Hyperspace.Maintenance.Interval");
144 
145  Path base_dir(props->get_str("Hyperspace.Replica.Dir"));
146 
147  if (!base_dir.is_complete()) {
148  Path data_dir = props->get_str("Hypertable.DataDirectory");
149  base_dir = data_dir / base_dir;
150  }
151 
152  m_base_dir = base_dir.string();
153 
154  HT_INFOF("BerkeleyDB base directory = '%s'", m_base_dir.c_str());
155  m_lock_file = m_base_dir + "/lock";
156 
157  // Make sure base directory exists, create if it doesn't
158  if (!FileUtils::exists(m_base_dir.c_str())) {
159  HT_INFOF("Base directory '%s' does not exist, creating...",
160  m_base_dir.c_str());
161  if (!FileUtils::mkdirs(m_base_dir.c_str())) {
162  HT_ERRORF("Unable to create base directory %s - %s",
163  m_base_dir.c_str(), strerror(errno));
164  exit(EXIT_FAILURE);
165  }
166  }
167 
168  if (!FileUtils::exists(m_lock_file.c_str())) {
169  HT_INFOF("Lock file '%s' does not exist, creating...",
170  m_lock_file.c_str());
171  if ((m_lock_fd = ::open(m_lock_file.c_str(), O_RDWR|O_CREAT|O_TRUNC, 0644)) < 0) {
172  HT_ERRORF("Unable to create lock file '%s' - %s", m_lock_file.c_str(), strerror(errno));
173  exit(EXIT_FAILURE);
174  }
175  }
176  else {
177  if ((m_lock_fd = ::open(m_lock_file.c_str(), O_RDWR)) < 0) {
178  HT_ERRORF("Unable to open lock file '%s' - %s", m_lock_file.c_str(), strerror(errno));
179  exit(EXIT_FAILURE);
180  }
181  }
182 
183  /*
184  * Lock the base directory to prevent concurrent masters
185  */
186 #if defined(__sun__)
187  struct flock fl;
188 
189  memset(&fl, 0, sizeof fl);
190 
191  fl.l_type = F_WRLCK;
192  fl.l_whence = SEEK_SET;
193  fl.l_start = 0;
194  fl.l_len = 0;
195  fl.l_pid = getpid();
196 
197  if (fcntl(m_lock_fd, F_SETLKW, &fl) == -1) {
198  if (errno == EWOULDBLOCK) {
199  HT_ERRORF("Lock file '%s' is locked by another process.",
200  m_lock_file.c_str());
201  }
202  else {
203  HT_ERRORF("Unable to lock file '%s' - %s",
204  m_lock_file.c_str(), strerror(errno));
205  }
206  exit(EXIT_FAILURE);
207  }
208 #else
209  if (flock(m_lock_fd, LOCK_EX | LOCK_NB) != 0) {
210  if (errno == EWOULDBLOCK) {
211  HT_ERRORF("Lock file '%s' is locked by another process.",
212  m_lock_file.c_str());
213  }
214  else {
215  HT_ERRORF("Unable to lock file '%s' - %s",
216  m_lock_file.c_str(), strerror(errno));
217  }
218  exit(EXIT_FAILURE);
219  }
220 #endif
221 
222  app_queue_ptr = make_shared<ApplicationQueue>(get_i32("workers"), false);
223  vector<Thread::id> thread_ids = app_queue_ptr->get_thread_ids();
224  thread_ids.push_back(ThisThread::get_id());
225 
226 
227  m_bdb_fs = new BerkeleyDbFilesystem(props, m_base_dir, thread_ids);
228  Event::set_bdb_fs(m_bdb_fs);
229 
230  /*
231  * Load and increment generation number
232  */
233  if (is_master())
235 
236  uint16_t port = props->get_i16("Hyperspace.Replica.Port");
237  InetAddr::initialize(&m_local_addr, INADDR_ANY, port);
238 
239  m_metrics_handler = std::make_shared<MetricsHandler>(props);
240  m_metrics_handler->start_collecting();
241 
242  m_last_tick = std::chrono::steady_clock::now();
243 
244  m_keepalive_handler_ptr = make_shared<ServerKeepaliveHandler>(conn_mgr->get_comm(), this, app_queue_ptr);
245  m_keepalive_handler_ptr->start();
246  keepalive_handler = m_keepalive_handler_ptr;
247 }
248 
249 
251  m_metrics_handler->stop_collecting();
252  delete m_bdb_fs;
254 }
255 
256 /*
257  * create_session does the following:
258  * > Lock the session expiry map
259  * > get and incr next session id from BDB
260  * > create new session in BDB
261  * > insert session id & expiry time into session expiry map
262  */
263 uint64_t Hyperspace::Master::create_session(struct sockaddr_in &addr) {
264  lock_guard<mutex> lock(m_session_map_mutex);
265 
266  SessionDataPtr session_data;
267  uint64_t session_id = 0;
268  String addr_str = InetAddr::format(addr);
269  HT_INFOF("Create session for %s", addr_str.c_str());
270 
271  HT_BDBTXN_BEGIN() {
272  // DB updates
273  session_id = m_bdb_fs->get_next_id_i64(txn, SESSION, true);
274  m_bdb_fs->create_session(txn, session_id, addr_str);
275  // in mem updates
276  session_data = make_shared<SessionData>(addr, m_lease_interval, session_id);
277  m_session_map[session_id] = session_data;
278  m_session_heap.push_back(session_data);
279 
280  txn.commit();
281  HT_INFOF("created session %llu", (Llu)session_id);
282  }
283  HT_BDBTXN_END(0);
284 
285  return session_id;
286 }
287 
288 /*
289  *
290  */
291 bool Hyperspace::Master::get_session(uint64_t session_id, SessionDataPtr &session_data) {
292  lock_guard<mutex> lock(m_session_map_mutex);
293  SessionMap::iterator iter = m_session_map.find(session_id);
294  if (iter == m_session_map.end())
295  return false;
296  session_data = (*iter).second;
297  return true;
298 }
299 
300 /*
301  * destroy_session does the following:
302  * > Lock the session expiry map and erase the session data object from it
303  * > Set the expiry time to now
304  */
305 void Hyperspace::Master::destroy_session(uint64_t session_id) {
306  lock_guard<mutex> lock(m_session_map_mutex);
307  SessionDataPtr session_data;
308  SessionMap::iterator iter = m_session_map.find(session_id);
309  if (iter == m_session_map.end())
310  return;
311  session_data = (*iter).second;
312  m_session_map.erase(session_id);
313  session_data->expire();
314  // force it to top of expiration heap
315  session_data->set_expire_time_now();
316  HT_INFOF("destroyed session %llu(%s)",
317  (Llu)session_id, session_data->get_name());
318 }
319 
320 /*
321  *
322  */
323 void Hyperspace::Master::initialize_session(uint64_t session_id, const String &name) {
324  SessionDataPtr session_data;
325  {
326  lock_guard<mutex> lock(m_session_map_mutex);
327  SessionMap::iterator iter = m_session_map.find(session_id);
328  if (iter == m_session_map.end()) {
329  HT_ERRORF("Unable to initialize session %llu (%s)", (Llu)session_id, name.c_str());
330  return;
331  }
332  session_data = (*iter).second;
333  }
334 
335  // set session name in BDB and mem
336  HT_BDBTXN_BEGIN() {
337  m_bdb_fs->set_session_name(txn, session_id, name);
338  txn.commit();
339  session_data->set_name(name);
340  }
341  HT_BDBTXN_END(BOOST_PP_EMPTY());
342 
343  HT_INFOF("Initialized session %llu (%s)", (Llu)session_id, name.c_str());
344 }
345 
346 /*
347  * renew_session_lease does the following:
348  * > Lock the session expiry map
349  * > If session lease can't be renewed
350  * > Do BDB txn to mark session as expired
351  * > Mark in mem session data as expired
352  * > (Don't delete session completely as handles etc need to be cleaned up)
353  */
354 int Hyperspace::Master::renew_session_lease(uint64_t session_id) {
355  lock_guard<mutex> lock(m_session_map_mutex);
356  bool renewed = false;
357  bool commited = false;
358  SessionDataPtr session_data;
359 
360  SessionMap::iterator iter = m_session_map.find(session_id);
361  if (iter == m_session_map.end())
363 
364  session_data = iter->second;
365  renewed = session_data->renew_lease();
366 
367  if (!renewed) {
368  // if renew failed then delete from BDB
369  HT_BDBTXN_BEGIN() {
370  m_bdb_fs->expire_session(txn, session_id);
371  txn.commit();
372  commited = true;
373  }
375 
376  // Do this outside BDB txn since delete event notifications might cause a BDB txn too
377  if (commited)
378  session_data->expire();
379 
380  // in mem session data will be cleaned up from map & heap in remove expired sessions
382  }
383 
384  return Error::OK;
385 }
386 
387 /*
388  * next_expired_session does the following:
389  * > Lock the session map mutex
390  * > Remake the session heap with the session with earliest expiry time on top
391  * > If top of heap session is expired
392  * > Pop it from the heap, delete it from the session map
393  * > return true with session data parameter pointing to this session
394  * > else return false
395  */
396 bool
398  std::chrono::steady_clock::time_point now) {
399  lock_guard<mutex> lock(m_session_map_mutex);
400  struct LtSessionData ascending;
401 
402  if (!m_session_heap.empty()) {
403  std::make_heap(m_session_heap.begin(), m_session_heap.end(), ascending);
404  session_data = m_session_heap.front();
405  if (session_data->is_expired(now) || m_shutdown) {
406  m_session_heap.erase(m_session_heap.begin());
407  m_session_map.erase(session_data->get_id());
408  return true;
409  }
410  }
411  session_data = 0;
412  return false;
413 }
414 
415 
416 /*
417  * remove_expired_sessions does the following:
418  * > extend session expiry in case of suspension
419  * > mark all expired sessions & get their open handles
420  * > destroy all expired & open handles
421  * > delete expired sessions in BDB
422  */
424  SessionDataPtr session_data;
425  int error;
426  String errmsg;
427  std::vector<uint64_t> handles;
428  std::vector<uint64_t> expired_sessions;
429 
430  auto now = std::chrono::steady_clock::now();
431 
432  // mark expired sessions
433  while (next_expired_session(session_data, now)) {
434  bool commited = false;
435  if (m_verbose)
436  HT_INFOF("Expiring session %llu name=%s", (Llu)session_data->get_id(),
437  session_data->get_name());
438  commited = false;
439  // expire session_data in mem and in BDB
440  HT_BDBTXN_BEGIN() {
441  m_bdb_fs->get_session_handles(txn, session_data->get_id(), handles);
442  m_bdb_fs->expire_session(txn, session_data->get_id());
443  txn.commit();
444  commited = true;
445  expired_sessions.push_back(session_data->get_id());
446  }
447  HT_BDBTXN_END(BOOST_PP_EMPTY());
448  // keep this outside the BDB txn since
449  if (commited)
450  session_data->expire();
451  }
452 
453  // delete handles open by expired sessions
454  for (auto handle : handles) {
455  if (m_verbose)
456  HT_INFOF("Destroying handle %llu", (Llu)handle);
457  if (!destroy_handle(handle, error, errmsg, false))
458  HT_INFOF("Problem destroying handle - %s (%s)",
459  Error::get_text(error), errmsg.c_str());
460  }
461 
462  // delete expired sessions from BDB
463  if (expired_sessions.size() > 0) {
464  HT_BDBTXN_BEGIN() {
465  for (auto expired_session : expired_sessions) {
466  m_bdb_fs->delete_session(txn, expired_session);
467  }
468  txn.commit();
469  }
470  HT_BDBTXN_END(BOOST_PP_EMPTY());
471  }
472 }
473 
474 /*
475  * Creates a directory with absolute path 'name'.
476  *
477  * Does the following:
478  * > Find parent node
479  * > Start BerkeleyDB txn to verify (and thus prevent modifications to) parent node,
480  * create new dir and
481  * > Send out CHILD_NODE_ADDED notifications
482  */
483 void
484 Hyperspace::Master::mkdir(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector<Attribute>& init_attrs) {
485  bool commited = false;
486  m_metrics_handler->request_increment();
487  CommandContext ctx("mkdir", session_id);
488  HT_BDBTXN_BEGIN() {
489  commited = false;
490  ctx.reset(&txn);
491  mkdir(ctx, name);
492  if (init_attrs.size() && !ctx.aborted)
493  attr_set(ctx, 0, name, init_attrs);
494  if (ctx.aborted)
495  txn.abort();
496  else {
497  txn.commit();
498  commited = true;
499  }
500  }
501  HT_BDBTXN_END_CB(cb);
502 
503  // check for errors
504  if (ctx.aborted) {
505  cb->error(ctx.error, ctx.error_msg);
506  if (ctx.error == Error::HYPERSPACE_FILE_EXISTS) { // info should be sufficient
507  HT_INFOF("%s - %s", Error::get_text(ctx.error), ctx.error_msg.c_str());
508  }
509  else {
510  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
511  }
512  return;
513  }
514 
515  // Deliver notifications if needed
516  if (commited)
518 
519  if ((ctx.error = cb->response_ok()) != Error::OK)
520  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
521 }
522 
523 void
524 Hyperspace::Master::mkdirs(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector<Attribute>& init_attrs) {
525  std::vector<EventContext> evts;
526  bool commited = false;
527  m_metrics_handler->request_increment();
528  CommandContext ctx("mkdirs", session_id);
529  HT_BDBTXN_BEGIN() {
530  commited = false;
531  ctx.reset(&txn);
532  bool file_exists;
533  exists(ctx, name, file_exists);
534  if (!ctx.aborted && !file_exists) {
535  typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
536  boost::char_separator<char> sep("/");
537  std::vector<String> name_components;
538  String path(name);
539  tokenizer tokens(path, sep);
540  for (tokenizer::iterator tok_iter = tokens.begin();
541  tok_iter != tokens.end(); ++tok_iter)
542  name_components.push_back(*tok_iter);
543 
544  path.clear();
545  for (size_t i=0; i<name_components.size(); i++) {
546  path += String("/") + name_components[i];
547  mkdir(ctx, path.c_str());
548  if (ctx.aborted && ctx.error != Error::HYPERSPACE_FILE_EXISTS)
549  break;
550  if (init_attrs.size() && !ctx.aborted &&
551  i == name_components.size() - 1)
552  attr_set(ctx, 0, name, init_attrs);
553  ctx.reset_error();
554  }
555  }
556 
557  if (ctx.aborted)
558  txn.abort();
559  else {
560  txn.commit();
561  commited = true;
562  }
563  }
564  HT_BDBTXN_END_CB(cb);
565 
566  // check for errors
567  if (ctx.aborted) {
568  cb->error(ctx.error, ctx.error_msg);
569  if (ctx.error == Error::HYPERSPACE_FILE_EXISTS) { // info should be sufficient
570  HT_INFOF("%s - %s", Error::get_text(ctx.error), ctx.error_msg.c_str());
571  }
572  else {
573  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
574  }
575  return;
576  }
577 
578  // Deliver notifications if needed
579  if (commited)
581 
582  if ((ctx.error = cb->response_ok()) != Error::OK)
583  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
584 }
585 
586 /*
587  * Unlink
588  *
589  * Does the following:
590  *
591  * > Get the parent node
592  * > Start BDB txn and validate (and so lock) the parent node
593  * > Validate node to be deleted
594  * > If there are no handles open for this node, create and persist node removed notifications
595  * > Delete node from BDB fs and delete node data from BDB
596  * > End BDB txn
597  * > Deliver notifications
598  */
599 void
600 Hyperspace::Master::unlink(ResponseCallback *cb, uint64_t session_id, const char *name) {
601  bool commited = false;
602  m_metrics_handler->request_increment();
603  CommandContext ctx("unlink", session_id);
604  HT_BDBTXN_BEGIN() {
605  commited = false;
606  ctx.reset(&txn);
607  unlink(ctx, name);
608  if (ctx.aborted)
609  txn.abort();
610  else {
611  txn.commit();
612  commited = true;
613  }
614  }
615  HT_BDBTXN_END_CB(cb);
616 
617  // check for errors
618  if (ctx.aborted) {
619  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
620  cb->error(ctx.error, ctx.error_msg);
621  return;
622  }
623 
624  // Deliver notifications if needed
625  if (commited)
627 
628  if ((ctx.error = cb->response_ok()) != Error::OK)
629  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
630 }
631 
632 
633 /*
634  * Open
635  *
636  */
637 void
638 Hyperspace::Master::open(ResponseCallbackOpen *cb, uint64_t session_id, const char *name,
639  uint32_t flags, uint32_t event_mask, std::vector<Attribute> &init_attrs) {
640 
641  bool commited = false;
642  uint64_t handle = 0;
643  bool created = false;
644  uint64_t lock_generation = 0;
645  m_metrics_handler->request_increment();
646  CommandContext ctx("open", session_id);
647  HT_BDBTXN_BEGIN() {
648  commited = false;
649  handle = 0;
650  created = false;
651  lock_generation = 0;
652  ctx.reset(&txn);
653  open(ctx, name, flags, event_mask, init_attrs, handle, created, lock_generation);
654  if (ctx.aborted)
655  txn.abort();
656  else {
657  txn.commit();
658  commited = true;
659  }
660  }
661  HT_BDBTXN_END_CB(cb);
662 
663  // check for errors
664  if (ctx.aborted) {
665  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
666  cb->error(ctx.error, ctx.error_msg);
667  return;
668  }
669 
670  // Deliver notifications if needed
671  if (commited)
673 
674  if (m_verbose)
675  HT_INFOF("exitting open(session_id=%llu, session_name = %s, fname=%s, flags=0x%x, event_mask=0x%x)",
676  (Llu)ctx.session_id, ctx.session_data->get_name(), name, flags, event_mask);
677 
678  if ((ctx.error = cb->response(handle, created, lock_generation)) != Error::OK)
679  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
680 }
681 
682 
683 /*
684  * Close
685  */
686 void Hyperspace::Master::close(ResponseCallback *cb, uint64_t session_id, uint64_t handle) {
687  m_metrics_handler->request_increment();
688  CommandContext ctx("close", session_id);
689  HT_BDBTXN_BEGIN() {
690  ctx.reset(&txn);
691  close(ctx, handle);
692  if (ctx.aborted)
693  txn.abort();
694  else
695  txn.commit();
696  }
697  HT_BDBTXN_END_CB(cb);
698 
699  // check for errors
700  if (ctx.aborted) {
701  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
702  cb->error(ctx.error, ctx.error_msg);
703  return;
704  }
705 
706  // if handle was open then destroy it (release lock if any, grant next
707  // pending lock, delete ephemeral etc.)
708  if (!destroy_handle(handle, ctx.error, ctx.error_msg)) {
709  cb->error(ctx.error, ctx.error_msg);
710  return;
711  }
712 
713  if ((ctx.error = cb->response_ok()) != Error::OK) {
714  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
715  }
716 }
717 
718 void Hyperspace::Master::close(CommandContext &ctx, uint64_t handle)
719 {
720  if (!ctx.session_data) {
721  if (!get_session(ctx.session_id, ctx.session_data)) {
723  return;
724  }
725  }
726 
727  if (m_verbose)
728  HT_INFOF("close(session=%llu(%s), handle=%llu)", (Llu)ctx.session_id,
729  ctx.session_data->get_name(), (Llu)handle);
730 
731  HT_ASSERT(ctx.txn);
732  BDbTxn &txn = *ctx.txn;
733 
734  if (!m_bdb_fs->session_exists(txn, ctx.session_id)) {
736  format("Session %llu (%s) does not exist",
737  (Llu)ctx.session_id, ctx.session_data->get_name()));
738  return;
739  }
740 
741  m_bdb_fs->delete_session_handle(txn, ctx.session_id, handle);
742 }
743 
744 /*
745  * attr_set does the following:
746  *
747  * > Make sure session is valid
748  * > Make sure handle is valid
749  * > Start BDB txn
750  * > Lock node data
751  * > Set attribute in BDB
752  * > Persist ATTR_SET event notifications
753  * > End BDB txn
754  * > Deliver ATTR_SET notifications
755  * > Send response
756  */
757 void
758 Hyperspace::Master::attr_set(ResponseCallback *cb, uint64_t session_id, uint64_t handle,
759  const char *name, uint32_t oflags, const std::vector<Attribute> &attrs) {
760 
761  bool commited = false;
762  uint64_t opened_handle = 0;
763  m_metrics_handler->request_increment();
764  CommandContext ctx("attrset", session_id);
765 
766  HT_ASSERT((name && *name) || handle);
767 
768  HT_BDBTXN_BEGIN() {
769  commited = false;
770  opened_handle = 0;
771  ctx.reset(&txn);
772 
773  if (handle != 0)
774  attr_set(ctx, handle, name, attrs);
775  else {
776  bool created;
777  uint64_t lock_generation;
778  std::vector<Attribute> none;
779  open(ctx, name, oflags, 0, none, opened_handle, created, lock_generation);
780  if (!ctx.aborted) {
781  attr_set(ctx, opened_handle, 0, attrs);
782  close(ctx, opened_handle);
783  }
784  }
785  if (ctx.aborted)
786  txn.abort();
787  else {
788  txn.commit();
789  commited = true;
790  }
791  }
792  HT_BDBTXN_END_CB(cb);
793 
794  // check for errors
795  if (ctx.aborted) {
796  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
797  cb->error(ctx.error, ctx.error_msg);
798  return;
799  }
800 
801  // deliver notifications
802  if (commited)
804 
805  // if handle was open then destroy it (release lock if any, grant next
806  // pending lock, delete ephemeral etc.)
807  if (opened_handle) {
808  if (!destroy_handle(opened_handle, ctx.error, ctx.error_msg)) {
809  cb->error(ctx.error, ctx.error_msg);
810  return;
811  }
812  }
813 
814  if ((ctx.error = cb->response_ok()) != Error::OK)
815  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
816 }
817 
818 /*
819  * attr_get does the following:
820  *
821  * > Make sure session is valid
822  * > Make sure handle is valid
823  * > Start BDB txn
824  * > Lock node data
825  * > Get attribute in BDB
826  * > End BDB txn
827  * > Send attr value back in response
828  *
829  */
830 void
832  uint64_t handle, const char *name,
833  const std::vector<String> &attrs) {
834 
835  std::vector<DynamicBufferPtr> dbufs;
836  dbufs.reserve(attrs.size());
837  m_metrics_handler->request_increment();
838  CommandContext ctx("attrget", session_id);
839  HT_BDBTXN_BEGIN() {
840  ctx.reset(&txn);
841  dbufs.clear();
842  if (attrs.size() == 1) { // if only one attr it might return HYPERSPACE_ATTR_NOT_FOUND
843  dbufs.push_back(make_shared<DynamicBuffer>());
844  attr_get(ctx, handle, name, attrs.front().c_str(), *dbufs.back());
845  }
846  else
847  attr_get(ctx, handle, name, attrs, dbufs);
848 
849  if (ctx.aborted)
850  txn.abort();
851  else
852  txn.commit();
853  }
854  HT_BDBTXN_END_CB(cb);
855 
856  if (ctx.aborted) {
859  HT_DEBUG_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
860  else
861  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
862  cb->error(ctx.error, ctx.error_msg);
863  return;
864  }
865 
866  if ((ctx.error = cb->response(dbufs)) != Error::OK)
867  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
868 }
869 
870 /*
871  * attr_incr does the following:
872  *
873  * > Make sure session is valid
874  * > Make sure handle is valid
875  * > Start BDB txn
876  * > Lock node data
877  * > atomically increment attribute in BDB
878  * > End BDB txn
879  * > Send previous attr value back in response
880  *
881  */
882 void
884  uint64_t handle, const char *name, const char* attr) {
885 
886  uint64_t attr_val;
887  m_metrics_handler->request_increment();
888  CommandContext ctx("attrincr", session_id);
889  HT_BDBTXN_BEGIN() {
890  ctx.reset(&txn);
891  attr_incr(ctx, handle, name, attr, attr_val);
892  if (ctx.aborted)
893  txn.abort();
894  else
895  txn.commit();
896  }
897  HT_BDBTXN_END_CB(cb);
898 
899  if (ctx.aborted) {
900  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
901  cb->error(ctx.error, ctx.error_msg);
902  return;
903  }
904 
905  if ((ctx.error = cb->response(attr_val)) != Error::OK)
906  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
907 }
908 
909 /*
910  * attr_del does the following:
911  *
912  * > Make sure session is valid
913  * > Make sure handle is valid
914  * > Start BDB txn
915  * > Lock node data
916  * > Delete attribute in BDB
917  * > Deliver ATTR_DEL event notifications
918  * > End BDB txn
919  *
920  */
921 void
922 Hyperspace::Master::attr_del(ResponseCallback *cb, uint64_t session_id, uint64_t handle,
923  const char *name) {
924  bool commited = false;
925  m_metrics_handler->request_increment();
926  CommandContext ctx("attrdel", session_id);
927  HT_BDBTXN_BEGIN() {
928  commited = false;
929  ctx.reset(&txn);
930  attr_del(ctx, handle, name);
931  if (ctx.aborted)
932  txn.abort();
933  else {
934  txn.commit();
935  commited = true;
936  }
937  }
938  HT_BDBTXN_END_CB(cb);
939 
940  // check for errors
941  if (ctx.aborted) {
942  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
943  cb->error(ctx.error, ctx.error_msg);
944  return;
945  }
946 
947  // deliver notifications
948  if (commited)
950 
951  if ((ctx.error = cb->response_ok()) != Error::OK)
952  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
953 }
954 
955 void
956 Hyperspace::Master::attr_exists(ResponseCallbackAttrExists *cb, uint64_t session_id, uint64_t handle,
957  const char *name, const char *attr)
958 {
959  bool exists = false;
960  m_metrics_handler->request_increment();
961  CommandContext ctx("attrexists", session_id);
962  HT_BDBTXN_BEGIN() {
963  ctx.reset(&txn);
964  attr_exists(ctx, handle, name, attr, exists);
965  if (ctx.aborted)
966  txn.abort();
967  else
968  txn.commit();
969  }
970  HT_BDBTXN_END_CB(cb);
971 
972  if (ctx.aborted) {
973  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
974  cb->error(ctx.error, ctx.error_msg);
975  return;
976  }
977 
978  if ((ctx.error = cb->response(exists)) != Error::OK)
979  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
980 }
981 
982 void
983 Hyperspace::Master::attr_list(ResponseCallbackAttrList *cb, uint64_t session_id, uint64_t handle)
984 {
985  std::vector<String> attributes;
986  m_metrics_handler->request_increment();
987  CommandContext ctx("attrlist", session_id);
988  HT_BDBTXN_BEGIN() {
989  ctx.reset(&txn);
990  attr_list(ctx, handle, attributes);
991  if (ctx.aborted)
992  txn.abort();
993  else
994  txn.commit();
995  }
996  HT_BDBTXN_END_CB(cb);
997 
998  if (ctx.aborted) {
999  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1000  cb->error(ctx.error, ctx.error_msg);
1001  return;
1002  }
1003 
1004  if ((ctx.error = cb->response(attributes)) != Error::OK)
1005  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
1006 }
1007 
1008 /*
1009  * exists does the following:
1010  *
1011  * > Start BDB txn
1012  * > Check if file exists in BDB
1013  * > End BDB txn
1014  *
1015  */
1016 void
1018  const char *name) {
1019 
1020  bool file_exists = false;
1021  m_metrics_handler->request_increment();
1022  CommandContext ctx("exists", session_id);
1023  HT_BDBTXN_BEGIN() {
1024  ctx.reset(&txn);
1025  exists(ctx, name, file_exists);
1026  if (ctx.aborted)
1027  txn.abort();
1028  else
1029  txn.commit();
1030  }
1031  HT_BDBTXN_END_CB(cb);
1032 
1033  if (ctx.aborted) {
1034  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1035  cb->error(ctx.error, ctx.error_msg);
1036  return;
1037  }
1038 
1039  if ((ctx.error = cb->response(file_exists)) != Error::OK)
1040  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
1041 }
1042 
1043 /*
1044  * read_dir does the following:
1045  *
1046  * > Make sure session is valid
1047  * > Start BDB txn
1048  * > Make sure handle is valid
1049  * > Read dir data from BDB
1050  * > End BDB txn
1051  *
1052  */
1053 void
1055  uint64_t handle) {
1056  std::vector<DirEntry> listing;
1057  m_metrics_handler->request_increment();
1058  CommandContext ctx("readdir", session_id);
1059  HT_BDBTXN_BEGIN() {
1060  ctx.reset(&txn);
1061  readdir(ctx, handle, listing);
1062  if (ctx.aborted)
1063  txn.abort();
1064  else
1065  txn.commit();
1066  }
1067  HT_BDBTXN_END_CB(cb);
1068 
1069  if (ctx.aborted) {
1070  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1071  cb->error(ctx.error, ctx.error_msg);
1072  return;
1073  }
1074 
1075  if ((ctx.error = cb->response(listing)) != Error::OK)
1076  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
1077 }
1078 
1079 /*
1080  * readdir_attr does the following:
1081  *
1082  * > Make sure session is valid
1083  * > Start BDB txn
1084  * > Make sure handle is valid
1085  * > Read dir data from BDB for entries in this dir that have the attr name set
1086  * > End BDB txn
1087  *
1088  */
1089 void
1091  uint64_t handle, const char *name, const char *attr, bool include_sub_entries) {
1092  std::vector<DirEntryAttr> listing;
1093  m_metrics_handler->request_increment();
1094  CommandContext ctx("readdirattr", session_id);
1095  HT_BDBTXN_BEGIN() {
1096  ctx.reset(&txn);
1097  readdir_attr(ctx, handle, name, attr, include_sub_entries, listing);
1098  if (ctx.aborted)
1099  txn.abort();
1100  else
1101  txn.commit();
1102  }
1103  HT_BDBTXN_END_CB(cb);
1104 
1105  if (ctx.aborted) {
1107  HT_DEBUG_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1108  else
1109  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1110  cb->error(ctx.error, ctx.error_msg);
1111  return;
1112  }
1113 
1114  if ((ctx.error = cb->response(listing)) != Error::OK)
1115  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
1116 }
1117 
1118 /*
1119  * readpath_attr does the following:
1120  *
1121  * > Make sure session is valid
1122  * > Start BDB txn
1123  * > Make sure handle is valid
1124  * > Read the value of the specified attr for all path components in this file's name
1125  * > End BDB txn
1126  *
1127  */
1128 void
1130  uint64_t handle, const char *name, const char *attr) {
1131  std::vector<DirEntryAttr> listing;
1132  m_metrics_handler->request_increment();
1133  CommandContext ctx("readpathattr", session_id);
1134  HT_BDBTXN_BEGIN() {
1135  ctx.reset(&txn);
1136  readpath_attr(ctx, handle, name, attr, listing);
1137  if (ctx.aborted)
1138  txn.abort();
1139  else
1140  txn.commit();
1141  }
1142  HT_BDBTXN_END_CB(cb);
1143 
1144  if (ctx.aborted) {
1146  HT_DEBUG_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1147  else
1148  HT_ERROR_OUT << Error::get_text(ctx.error) << " - " << ctx.error_msg << HT_END;
1149  cb->error(ctx.error, ctx.error_msg);
1150  return;
1151  }
1152 
1153  if ((ctx.error = cb->response(listing)) != Error::OK)
1154  HT_ERRORF("Problem sending back response - %s", Error::get_text(ctx.error));
1155 }
1156 
1157 /*
1158  * shutdown
1159  */
1160 void Hyperspace::Master::shutdown(ResponseCallback *cb, uint64_t session_id) {
1161  if (m_verbose)
1162  HT_INFOF("shutdown(session=%llu", (Llu)session_id);
1163 
1164  // destroy session
1165  destroy_session(session_id);
1166 
1167  // destroy dangling sessions...
1168  {
1169  lock_guard<mutex> lock(m_session_map_mutex);
1170  m_shutdown = true;
1171  SessionDataPtr session_data;
1172  while (m_session_map.size()) {
1173  SessionMap::iterator iter = m_session_map.begin();
1174  session_data = (*iter).second;
1175  m_session_map.erase(iter);
1176  session_data->expire();
1177  // force it to top of expiration heap
1178  session_data->set_expire_time_now();
1179  HT_INFOF("destroyed dangling session %llu(%s)",
1180  (Llu)session_data->get_id(), session_data->get_name());
1181  }
1182  }
1183 
1184  //...and remove
1186 
1187  int error;
1188  if ((error = cb->response_ok()) != Error::OK)
1189  HT_ERRORF("Problem sending back response - %s", Error::get_text(error));
1190 
1191  m_keepalive_handler_ptr->shutdown();
1192 }
1193 
1195  HT_INFO("status");
1196  cb->response(m_status);
1197 }
1198 
1199 /*
1200  * lock
1201  */
1202 void
1203 Hyperspace::Master::lock(ResponseCallbackLock *cb, uint64_t session_id, uint64_t handle,
1204  uint32_t mode, bool try_lock) {
1205  SessionDataPtr session_data;
1206  bool notify = true;
1207  uint32_t open_flags, cur_lock_mode;
1208  String node;
1209  uint64_t lock_generation = 0;
1210  uint64_t event_id;
1211  HyperspaceEventPtr lock_acquired_event;
1212  NotificationMap lock_acquired_notifications;
1213  bool persisted_notifications = false;
1214  bool aborted=false, commited=false;
1215  int lock_status = 0;
1216  int error = Error::OK;
1217  String error_msg;
1218 
1219  if (!get_session(session_id, session_data)) {
1221  return;
1222  }
1223 
1224  if (m_verbose) {
1225  HT_INFOF("lock(session=%llu(%s), handle=%llu, mode=0x%x, try_lock=%d)",
1226  (Llu)session_id, session_data->get_name(), (Llu)handle, mode, try_lock);
1227  }
1228 
1229  HT_BDBTXN_BEGIN() {
1230  // (re) initialize vars
1231  aborted = false; commited = false; persisted_notifications = false;
1232  lock_status=0;
1233 
1234  // make sure session is still valid
1235  if (!m_bdb_fs->session_exists(txn, session_id)) {
1237  error_msg = format("session: %lld", (Lld)session_id);
1238  aborted = true;
1239  goto txn_commit;
1240  }
1241 
1242  if (!m_bdb_fs->handle_exists(txn, handle)) {
1243  aborted = true;
1244  goto txn_commit;
1245  }
1246 
1247  open_flags = m_bdb_fs->get_handle_open_flags(txn, handle);
1248  if (!(open_flags & OPEN_FLAG_LOCK)) {
1250  error_msg = "handle not open for locking";
1251  aborted = true;
1252  goto txn_commit;
1253  }
1254 
1255  if (!(open_flags & OPEN_FLAG_WRITE)) {
1257  error_msg = "handle not open for writing";
1258  aborted = true;
1259  goto txn_commit;
1260  }
1261 
1262  m_bdb_fs->get_handle_node(txn, handle, node);
1263  cur_lock_mode = m_bdb_fs->get_node_cur_lock_mode(txn, node);
1264  if (cur_lock_mode == LOCK_MODE_EXCLUSIVE) {
1265  if (try_lock)
1266  lock_status = LOCK_STATUS_BUSY;
1267  else {
1268  // don't abort transaction since we need to persist pending lock req
1269  LockRequest lock_request(handle, mode);
1270  m_bdb_fs->add_node_pending_lock_request(txn, node, lock_request);
1271  lock_status = LOCK_STATUS_PENDING;
1272  }
1273  goto txn_commit;
1274  }
1275  else if (cur_lock_mode == LOCK_MODE_SHARED) {
1276  if (mode == LOCK_MODE_EXCLUSIVE) {
1277  if (try_lock)
1278  lock_status = LOCK_STATUS_BUSY;
1279  else {
1280  // don't abort transaction since we need to persist pending lock req
1281  LockRequest lock_request(handle, mode);
1282  m_bdb_fs->add_node_pending_lock_request(txn, node, lock_request);
1283  lock_status = LOCK_STATUS_PENDING;
1284  }
1285  goto txn_commit;
1286  }
1287 
1288  HT_ASSERT(mode == LOCK_MODE_SHARED);
1289 
1290  if (m_bdb_fs->node_has_pending_lock_request(txn, node)) {
1291  if (try_lock)
1292  lock_status = LOCK_STATUS_BUSY;
1293  else {
1294  // don't abort transaction since we need to persist pending lock req
1295  LockRequest lock_request(handle, mode);
1296  m_bdb_fs->add_node_pending_lock_request(txn, node, lock_request);
1297  lock_status = LOCK_STATUS_PENDING;
1298  }
1299  goto txn_commit;
1300  }
1301  }
1302 
1303  // at this point we're OK to acquire the lock
1304  if (mode == LOCK_MODE_SHARED && m_bdb_fs->node_has_shared_lock_handles(txn, node))
1305  notify = false;
1306 
1307  lock_status = LOCK_STATUS_GRANTED;
1308  lock_generation = m_bdb_fs->incr_node_lock_generation(txn, node);
1309 
1310  m_bdb_fs->set_xattr_i64(txn, node, "lock.generation", lock_generation);
1311  m_bdb_fs->set_node_cur_lock_mode(txn, node, mode);
1312  lock_handle(txn, handle, mode, node);
1313 
1314  // create lock acquired event & persist event notifications
1315  if (notify) {
1316  event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
1318  mode);
1319  lock_acquired_event = make_shared<EventLockAcquired>(event_id, mode);
1321  lock_acquired_notifications)) {
1322  persist_event_notifications(txn, event_id, lock_acquired_notifications);
1323  persisted_notifications = true;
1324  }
1325  }
1326 
1327  // commit the txn
1328  txn_commit:
1329  if (aborted) {
1330  txn.abort();
1331  std::stringstream sout;
1332  sout << "lock txn=" << txn << " aborted " << " handle=" << handle << " node="
1333  << node << " mode=" << mode << " status=" << lock_status
1334  << " lock_generation=" << lock_generation;
1335  HT_INFOF("%s", sout.str().c_str());
1336  }
1337  else {
1338  txn.commit();
1339  commited = true;
1340  std::stringstream sout;
1341  sout << "lock txn=" << txn << " commited " << " handle=" << handle << " node="
1342  << node << " mode=" << mode << " status=" << lock_status
1343  << " lock_generation=" << lock_generation << " notification_count="
1344  << lock_acquired_notifications.size();
1345  HT_INFOF("%s", sout.str().c_str());
1346  }
1347  }
1348  HT_BDBTXN_END_CB(cb);
1349 
1350  // check for errors
1351  if (aborted) {
1352  cb->error(error, error_msg);
1353  return;
1354  }
1355 
1356  // send lock request response
1357  switch (lock_status) {
1358  case LOCK_STATUS_GRANTED:
1359  cb->response(lock_status, lock_generation);
1360  break;
1361  default:
1362  cb->response(lock_status);
1363  }
1364 
1365  // deliver lock acquired event notifications
1366  if (commited && persisted_notifications) {
1367  deliver_event_notifications(lock_acquired_event, lock_acquired_notifications);
1368  }
1369 
1370 }
1371 
1372 /*
1373  * Assumes node is locked and BDB txn has started.
1374  * lock_handle does the following:
1375  * > If node name string is null then fill in the node name
1376  * > Set exclusive handle to acquiring handle or add acquiring handle to set of shared handles
1377  * > Set handle data to locked
1378  */
1379 void Hyperspace::Master::lock_handle(BDbTxn &txn, uint64_t handle, uint32_t mode, String& node) {
1380 
1381  if (node == "")
1382  m_bdb_fs->get_handle_node(txn, handle, node);
1383 
1384  if (mode == LOCK_MODE_SHARED)
1385  m_bdb_fs->add_node_shared_lock_handle(txn, node, handle);
1386  else {
1387  HT_ASSERT(mode == LOCK_MODE_EXCLUSIVE);
1388  m_bdb_fs->set_node_exclusive_lock_handle(txn, node, handle);
1389  }
1390  m_bdb_fs->set_handle_locked(txn, handle, true);
1391 }
1392 
1393 /*
1394  * Assumes node is locked.
1395  * lock_handle does the following:
1396  * > Set exclusive handle to acquiring handle or add acquiring handle to set of shared handles
1397  * > Set handle data to locked
1398  */
1399 void Hyperspace::Master::lock_handle(BDbTxn &txn, uint64_t handle, uint32_t mode, const String& node) {
1400 
1401  HT_ASSERT(node != "");
1402  if (mode == LOCK_MODE_SHARED)
1403  m_bdb_fs->add_node_shared_lock_handle(txn, node, handle);
1404  else {
1405  HT_ASSERT(mode == LOCK_MODE_EXCLUSIVE);
1406  m_bdb_fs->set_node_exclusive_lock_handle(txn, node, handle);
1407  }
1408  m_bdb_fs->set_handle_locked(txn, handle, true);
1409 }
1410 
1411 
1412 /*
1413  * release
1414  */
1415 void
1416 Hyperspace::Master::release(ResponseCallback *cb, uint64_t session_id, uint64_t handle) {
1417  SessionDataPtr session_data;
1418  String node;
1419  int error = 0;
1420  String error_msg;
1421  NotificationMap lock_release_notifications, lock_granted_notifications,
1422  lock_acquired_notifications;
1423  HyperspaceEventPtr lock_release_event, lock_granted_event, lock_acquired_event;
1424  bool aborted = false, commited = false;
1425 
1426  if (!get_session(session_id, session_data)) {
1428  return;
1429  }
1430 
1431  if (m_verbose) {
1432  HT_INFOF("release(session=%llu(%s), handle=%llu)",
1433  (Llu)session_id, session_data->get_name(), (Llu)handle);
1434  }
1435 
1436  // txn 1: release lock
1437  HT_BDBTXN_BEGIN() {
1438 
1439  // make sure session is still valid
1440  if (!m_bdb_fs->session_exists(txn, session_id)) {
1442  error_msg = format("session: %lld", (Lld)session_id);
1443  aborted = true;
1444  goto txn_commit_1;
1445  }
1446 
1447  if (!m_bdb_fs->handle_exists(txn, handle)) {
1449  error_msg = format("handle=%lld", (Lld)handle);
1450  aborted = true;
1451  goto txn_commit_1;
1452  }
1453 
1454  m_bdb_fs->get_handle_node(txn, handle, node);
1455 
1456  release_lock(txn, handle, node, lock_release_event, lock_release_notifications);
1457 
1458  txn_commit_1:
1459  if (aborted)
1460  txn.abort();
1461  else {
1462  txn.commit();
1463  commited = true;
1464  }
1465  }
1466  HT_BDBTXN_END_CB(cb);
1467 
1468  // check for errors
1469  if (aborted) {
1470  HT_ERROR_OUT << Error::get_text(error) << " - " << error_msg << HT_END;
1471  cb->error(error, error_msg);
1472  return;
1473  }
1474 
1475  // deliver lock released notifications
1476  if (commited)
1477  deliver_event_notifications(lock_release_event, lock_release_notifications);
1478 
1479  // txn 2: grant pending lock(s)
1480  HT_BDBTXN_BEGIN() {
1481  grant_pending_lock_reqs(txn, node, lock_granted_event, lock_granted_notifications,
1482  lock_acquired_event, lock_acquired_notifications);
1483  txn.commit();
1484  }
1485  HT_BDBTXN_END_CB(cb);
1486 
1487  // deliver lock granted & acquired notifications
1488  deliver_event_notifications(lock_granted_event, lock_granted_notifications);
1489  deliver_event_notifications(lock_acquired_event, lock_acquired_notifications);
1490 
1491  cb->response_ok();
1492 }
1493 
1494 /*
1495  * release_lock: does the following
1496  * > if handle is not locked return
1497  * > if lock is exclusive
1498  * > set node exclusive lock handle to 0
1499  * > else
1500  * > delete this shared lock handle from node
1501  * > set handle to unlocked
1502  * > if node has no shared lock handles (at this point it has no exclusive lock handle)
1503  * > create a new LOCK_RELEASED event and persist in BDB
1504  * > get map of handles -> sessions to be notified of LOCK_RELEASE event on this node
1505  * > persist LOCK_RELEASE notifications
1506  * > set node to unlocked in BDB
1507  *
1508  */
1509 void Hyperspace::Master::release_lock(BDbTxn &txn, uint64_t handle, const String &node,
1510  HyperspaceEventPtr &release_event, NotificationMap &release_notifications) {
1511  vector<uint64_t> next_lock_handles;
1512  uint64_t exclusive_lock_handle=0;
1513 
1514  if (m_bdb_fs->handle_is_locked(txn, handle)) {
1515  exclusive_lock_handle = m_bdb_fs->get_node_exclusive_lock_handle(txn,node);
1516  if (exclusive_lock_handle != 0) {
1517  HT_ASSERT(handle == exclusive_lock_handle);
1519  }
1520  else {
1521  m_bdb_fs->delete_node_shared_lock_handle(txn, node, handle);
1522  }
1523  m_bdb_fs->set_handle_locked(txn, handle, false);
1524  }
1525  else
1526  return;
1527 
1528  // persist LOCK_RELEASED notifications if no more locks held on node
1529  if (!m_bdb_fs->node_has_shared_lock_handles(txn, node)) {
1530  HT_INFO("Persisting lock released notifications");
1531  uint64_t event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
1532  release_event = make_shared<EventLockReleased>(event_id);
1534  release_event->get_mask());
1535  if (m_bdb_fs->get_node_event_notification_map(txn, node, release_event->get_mask(),
1536  release_notifications)) {
1537  persist_event_notifications(txn, event_id, release_notifications);
1538  }
1539 
1540  m_bdb_fs->set_node_cur_lock_mode(txn, node, 0);
1541  HT_INFO("Finished persisting lock released notifications");
1542  }
1543 }
1544 
1545 /*
1546  * grant_pending_lock_reqs does the following
1547  *
1548  * > Check if the node has pending locks
1549  * > Get the first pending exclusive lock or set of next pending shared locks
1550  * > Grant lock to next pending lock(s)
1551  * > Persist lock granted notifications
1552  * > Persist lock acquired notifications
1553  */
1555  HyperspaceEventPtr &lock_granted_event, NotificationMap &lock_granted_notifications,
1556  HyperspaceEventPtr &lock_acquired_event, NotificationMap &lock_acquired_notifications) {
1557  vector<uint64_t> next_lock_handles;
1558  int next_mode = 0;
1559  LockRequest front_lock_req;
1560 
1561  if (m_bdb_fs->get_node_pending_lock_request(txn, node, front_lock_req)) {
1562  next_mode = front_lock_req.mode;
1563 
1564  if (next_mode == LOCK_MODE_EXCLUSIVE) {
1565  // get the pending exclusive lock request
1566  next_lock_handles.push_back(front_lock_req.handle);
1567  m_bdb_fs->delete_node_pending_lock_request(txn, node, front_lock_req.handle);
1568  }
1569  else {
1570  // gather up all the pending shared lock requests preceeding the next exclusive request
1571  HT_ASSERT(next_mode == LOCK_MODE_SHARED);
1572  LockRequest lockreq = front_lock_req;
1573  do {
1574  if (lockreq.mode != LOCK_MODE_SHARED)
1575  break;
1576  next_lock_handles.push_back(lockreq.handle);
1577  m_bdb_fs->delete_node_pending_lock_request(txn, node, lockreq.handle);
1578  } while (m_bdb_fs->get_node_pending_lock_request(txn, node, lockreq));
1579  }
1580 
1581  if (!next_lock_handles.empty()) {
1582  // we have at least 1 pending lock request
1583  // grant lock to next pending locks and persist lock granted notifications
1584  uint64_t lock_generation = m_bdb_fs->incr_node_lock_generation(txn, node);
1585  uint64_t event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
1586  uint64_t session;
1588  next_mode, lock_generation);
1589  m_bdb_fs->set_xattr_i64(txn, node, "lock.generation", lock_generation);
1590  m_bdb_fs->set_node_cur_lock_mode(txn, node, next_mode);
1591 
1592  lock_granted_event = make_shared<EventLockGranted>(event_id, next_mode, lock_generation);
1593 
1594  for (auto handle : next_lock_handles) {
1595  lock_handle(txn, handle, next_mode, node);
1596  session = m_bdb_fs->get_handle_session(txn, handle);
1597  lock_granted_notifications[handle] = session;
1598  }
1599  // persist lock granted notifications
1600  persist_event_notifications(txn, event_id, lock_granted_notifications);
1601 
1602  // create lock acquired event
1603  event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
1605  next_mode);
1606  lock_acquired_event = make_shared<EventLockAcquired>(event_id, next_mode);
1607  // persist lock acquired notifications
1609  lock_acquired_notifications))
1610  persist_event_notifications(txn, event_id, lock_acquired_notifications);
1611  }
1612  }
1613 }
1614 
1615 /*
1616  * Assumes it is in the middle of a BDB txn
1617  *
1618  * > Store the handles affected by an event in BerkeleyDB
1619  */
1620 void
1622  NotificationMap &handles_to_sessions)
1623 {
1624  if (handles_to_sessions.size() > 0) {
1625  vector<uint64_t> handles;
1626  for (NotificationMap::iterator iter = handles_to_sessions.begin();
1627  iter != handles_to_sessions.end(); iter++) {
1628  handles.push_back(iter->first);
1629  }
1630  m_bdb_fs->set_event_notification_handles(txn, event_id, handles);
1631  }
1632 }
1633 
1634 /*
1635  * Assumes it is in the middle of a BDB txn
1636  *
1637  * > Store the handle addected by an event in BerkeleyDB
1638  */
1639 void
1640 Hyperspace::Master::persist_event_notifications(BDbTxn &txn, uint64_t event_id, uint64_t handle)
1641 {
1642  vector<uint64_t> handles;
1643  handles.push_back(handle);
1644  m_bdb_fs->set_event_notification_handles(txn, event_id, handles);
1645 }
1646 
1647 /*
1648  *
1649  */
1650 void
1652  NotificationMap &handles_to_sessions, bool wait_for_notify)
1653 {
1654  SessionDataPtr session_data;
1655  uint64_t session_id;
1656  uint64_t handle_id;
1657  bool has_notifications = false;
1658  vector<uint64_t> sessions;
1659 
1660  for (NotificationMap::iterator iter = handles_to_sessions.begin();
1661  iter != handles_to_sessions.end(); iter++) {
1662  handle_id = iter->first;
1663  session_id = iter->second;
1664  if(get_session(session_id,session_data)) {
1665  session_data->add_notification(new Notification(handle_id, event_ptr ) );
1666  sessions.push_back(session_id);
1667  has_notifications = true;
1668  }
1669  }
1670 
1671  if (has_notifications) {
1672  String sessions_str;
1673 
1674  for (auto session_id : sessions) {
1675  m_keepalive_handler_ptr->deliver_event_notifications(session_id);
1676  sessions_str += String(" ") + session_id;
1677  }
1678 
1679  if (wait_for_notify)
1680  event_ptr->wait_for_notifications();
1681 
1682  if (m_verbose)
1683  HT_INFOF("exitting deliver_event_notifications for event_id=%llu mask=0x%x sessions=(%s )",
1684  (Llu)event_ptr->get_id(), (int)(Llu)event_ptr->get_mask(), sessions_str.c_str());
1685  }
1686  else {
1687  HT_DEBUG_OUT << "exitting deliver_event_notifications nothing to do"<< HT_END;
1688  }
1689 }
1690 
1691 /*
1692  *
1693  */
1694 bool
1695 Hyperspace::Master::find_parent_node(const String &normal_name,String &parent_name, String &child_name) {
1696  size_t last_slash = normal_name.rfind("/", normal_name.length());
1697 
1698  child_name.clear();
1699 
1700  if (last_slash > 0) {
1701  parent_name = normal_name.substr(0, last_slash);
1702  child_name.append(normal_name, last_slash + 1, normal_name.length() - last_slash - 1);
1703  return true;
1704  }
1705  else if (last_slash == 0) {
1706  parent_name = "/";
1707  child_name.append(normal_name, 1, normal_name.length() - 1);
1708  return true;
1709  }
1710 
1711  return false;
1712 }
1713 
1714 /*
1715  * destroy_handle does the following:
1716  * > Start BDB txn
1717  * > Read in Node data
1718  * > Call release lock passing it the parent txn (update handle and node to unlocked,
1719  * remove handle from node data, grant lock to next acquiring handles,
1720  * persist LOCK_RELEASED, GRANTED and ACQUIRED notifications)
1721  * > Keep track of node id and delete handle data from BDB
1722  * > End BDB txn
1723  * > Deliver notifications
1724  *
1725  * > If no one else has this node open
1726  * > if this is an ephemeral node
1727  * > Start BDB txn
1728  * > persist CHILD_NODE_REMOVED event notification
1729  * > delete node from BDB
1730  * > End BDB txn
1731  *
1732  */
1733 bool
1734 Hyperspace::Master::destroy_handle(uint64_t handle, int &error, String &errmsg,
1735  bool wait_for_notify) {
1736  bool has_refs = false;
1737  NotificationMap lock_release_notifications, lock_granted_notifications,
1738  lock_acquired_notifications, node_removed_notifications;
1739  HyperspaceEventPtr lock_release_event, lock_granted_event, lock_acquired_event,
1740  node_removed_event ;
1741  bool node_removed = false;
1742  String node;
1743  bool aborted = false;
1744 
1745  HT_DEBUG_OUT << "destroy_handle (handle=" << handle << ")" << HT_END;
1746 
1747  // txn 1: release lock
1748  HT_BDBTXN_BEGIN() {
1749  // Make sure handle is valid is not being deleted by someone else
1750  if (!m_bdb_fs->handle_exists(txn, handle) ||
1751  m_bdb_fs->get_handle_del_state(txn, handle) != HANDLE_NOT_DEL) {
1752  aborted = true;
1754  errmsg = format("Handle %lld already deleted or being deleted", (Lld)handle);
1755  goto txn_commit;
1756  }
1758  m_bdb_fs->get_handle_node(txn, handle, node);
1759  m_bdb_fs->delete_node_handle(txn, node, handle);
1760  release_lock(txn, handle, node, lock_release_event, lock_release_notifications);
1761 
1762  txn_commit:
1763  txn.commit();
1764  }
1765  HT_BDBTXN_END(false);
1766 
1767  if (aborted)
1768  return false;
1769 
1770  // deliver lock released notifications
1771  deliver_event_notifications(lock_release_event, lock_release_notifications,
1772  wait_for_notify);
1773 
1774  // txn 2: grant pending lock(s)
1775  HT_BDBTXN_BEGIN() {
1776  grant_pending_lock_reqs(txn, node, lock_granted_event, lock_granted_notifications,
1777  lock_acquired_event, lock_acquired_notifications);
1778  txn.commit();
1779  }
1780  HT_BDBTXN_END(false);
1781 
1782  // deliver lock granted & acquired notifications
1783  deliver_event_notifications(lock_granted_event, lock_granted_notifications, wait_for_notify);
1784  deliver_event_notifications(lock_acquired_event, lock_acquired_notifications,
1785  wait_for_notify);
1786 
1787  // txn 3: delete node if ephemeral and no one has it open
1788  HT_BDBTXN_BEGIN() {
1789  has_refs = m_bdb_fs->node_has_open_handles(txn, node);
1790  if (!has_refs && m_bdb_fs->node_is_ephemeral(txn, node)) {
1791  String parent_node, child_node;
1792 
1793  if (find_parent_node(node, parent_node, child_node)) {
1794  // persist child node removed notifications
1795  uint64_t event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
1796  m_bdb_fs->create_event(txn, EVENT_TYPE_NAMED, event_id,
1797  EVENT_MASK_CHILD_NODE_REMOVED, child_node);
1798  node_removed_event = make_shared<EventNamed>(event_id, EVENT_MASK_CHILD_NODE_REMOVED,
1799  child_node);
1800  if (m_bdb_fs->get_node_event_notification_map(txn, parent_node,
1801  EVENT_MASK_CHILD_NODE_REMOVED, node_removed_notifications)) {
1802  persist_event_notifications(txn, event_id, node_removed_notifications);
1803  }
1804  // unlink file and delete node data from BDB
1805  m_bdb_fs->unlink(txn, node);
1806  m_bdb_fs->delete_node(txn, node);
1807  node_removed = true;
1808  }
1809  }
1810  txn.commit();
1811  }
1812  HT_BDBTXN_END(false);
1813  // deliver node removed notifications
1814  if (node_removed) {
1815  deliver_event_notifications(node_removed_event, node_removed_notifications,
1816  wait_for_notify);
1817  }
1818 
1819  // txn 4: delete handle data from BDB
1820  HT_BDBTXN_BEGIN() {
1821  m_bdb_fs->delete_handle(txn, handle);
1822  txn.commit();
1823  }
1824  HT_BDBTXN_END(false);
1825 
1826  return true;
1827 }
1828 
1830  m_sleep_time = std::chrono::steady_clock::now();
1831  HT_INFO("Suspend detected.");
1832 }
1833 
1834 
1836 
1837  auto now = std::chrono::steady_clock::now();
1838 
1839  std::chrono::milliseconds lease_credit =
1840  std::chrono::duration_cast<std::chrono::milliseconds>(now - m_sleep_time) +
1841  std::chrono::milliseconds(m_lease_interval);
1842 
1843  // extend all leases
1844  {
1845  lock_guard<mutex> lock(m_session_map_mutex);
1846  if (!m_shutdown) {
1847  HT_INFOF("Resume detected, extending all session leases "
1848  "by %lu milliseconds.", (Lu)lease_credit.count());
1849  for (SessionMap::iterator iter = m_session_map.begin();
1850  iter != m_session_map.end(); iter++)
1851  (*iter).second->extend_lease(lease_credit);
1852  }
1853  }
1854 }
1855 
1857 
1858  {
1859  lock_guard<mutex> lock(m_maintenance_mutex);
1861  return;
1863  }
1864 
1866 
1867  {
1868  lock_guard<mutex> lock(m_maintenance_mutex);
1869  m_maintenance_outstanding = false;
1870  }
1871 
1872 }
1873 
1874 void Hyperspace::Master::mkdir(CommandContext &ctx, const char *name) {
1875  if (m_verbose) {
1876  HT_INFOF("%s(session_id=%llu, name=%s)", ctx.friendly_name, (Llu)ctx.session_id, name);
1877  }
1878 
1879  String parent_node, child_name;
1880  if (!find_parent_node(name, parent_node, child_name) || strlen(name)==0) {
1881  ctx.set_error(Error::HYPERSPACE_FILE_EXISTS, "directory '/' exists");
1882  return;
1883  }
1884 
1885  if (name[0] != '/' || name[strlen(name)-1] == '/') {
1886  ctx.set_error(Error::HYPERSPACE_BAD_PATHNAME, (String)"directory '" + name + "' bad");
1887  return;
1888  }
1889 
1890  if (!ctx.session_data) {
1891  if (!get_session(ctx.session_id, ctx.session_data)) {
1892  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
1893  return;
1894  }
1895  }
1896 
1897  HT_ASSERT(ctx.txn);
1898  BDbTxn &txn = *ctx.txn;
1899 
1900  // make sure parent node data is setup
1901  if (!validate_and_create_node_data(txn, parent_node)) {
1902  ctx.set_error(Error::HYPERSPACE_FILE_NOT_FOUND, (String)"' parent node: '" + parent_node + "'");
1903  return;
1904  }
1905 
1906  // make sure this node doesn't exist already
1907  if (m_bdb_fs->exists(txn, name)) {
1908  ctx.set_error(Error::HYPERSPACE_FILE_EXISTS, (String)"node: '" + name + "'");
1909  return;
1910  }
1911 
1912  // create event and persist notifications
1913  create_event(ctx, parent_node, EVENT_MASK_CHILD_NODE_ADDED, child_name);
1914 
1915  // create node
1916  m_bdb_fs->mkdir(txn, name);
1917 
1918  // create node data
1919  m_bdb_fs->create_node(txn, name, false, 1);
1920 }
1921 
1922 
1923 
1924 void Hyperspace::Master::open(CommandContext &ctx, const char *name,
1925  uint32_t flags, uint32_t event_mask,
1926  std::vector<Attribute> &init_attrs, uint64_t& handle,
1927  bool& created, uint64_t& lock_generation) {
1928 
1929  handle = 0;
1930  created = false;
1931  lock_generation = 0;
1932 
1933  String child_name, node = name, parent_node;
1934  bool lock_notify = false;
1935  uint32_t lock_mode = 0;
1936 
1937  HT_ASSERT(name[0] == '/');
1938 
1939  if (!ctx.session_data) {
1940  if (!get_session(ctx.session_id, ctx.session_data)) {
1941  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
1942  return;
1943  }
1944  }
1945 
1946  if (m_verbose) {
1947  HT_INFOF("open(session_id=%llu, session_name = %s, fname=%s, flags=0x%x, event_mask=0x%x)",
1948  (Llu)ctx.session_id,ctx. session_data->get_name(), name, flags, event_mask);
1949  }
1950 
1951  if (!find_parent_node(name, parent_node, child_name)) {
1953  return;
1954  }
1955 
1956  if (!init_attrs.empty() && !(flags & OPEN_FLAG_CREATE)) {
1957  ctx.set_error(Error::HYPERSPACE_CREATE_FAILED, "initial attributes can only be supplied on CREATE");
1958  return;
1959  }
1960 
1961  HT_ASSERT(ctx.txn);
1962  BDbTxn &txn = *ctx.txn;
1963 
1964  // make sure session is still valid
1965  if (!m_bdb_fs->session_exists(txn, ctx.session_id)) {
1966  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
1967  return;
1968  }
1969 
1970  // make sure parent node is valid and create node_data if needed
1971  if (!validate_and_create_node_data(txn, parent_node)) {
1972  ctx.set_error(Error::HYPERSPACE_FILE_NOT_FOUND, (String)" node: '" + node + "' parent node: '" + parent_node + "'");
1973  return;
1974  }
1975 
1976  bool existed = m_bdb_fs->exists(txn, name);
1977  if (existed) { // node exists in DB already
1978  // check flags
1979  if ((flags & OPEN_FLAG_CREATE) && (flags & OPEN_FLAG_EXCL)) {
1980  ctx.set_error(Error::HYPERSPACE_FILE_EXISTS, "mode=CREATE|EXCL");
1981  return;
1982  }
1983 
1984  if ((flags & OPEN_FLAG_TEMP)) {
1985  ctx.set_error(Error::HYPERSPACE_FILE_EXISTS, (String) "Unable to open TEMP file " + node + "because it already exists");
1986  return;
1987  }
1988 
1989  // create node data if it doesn't exist and set lock generation
1990  validate_and_create_node_data(txn, node);
1991 
1992  // check for lock mode conflicts
1994  uint32_t cur_lock_mode = m_bdb_fs->get_node_cur_lock_mode(txn, node);
1995  if ((flags & OPEN_FLAG_LOCK_SHARED) == OPEN_FLAG_LOCK_SHARED) {
1996  if (cur_lock_mode == LOCK_MODE_EXCLUSIVE) {
1998  return;
1999  }
2000  lock_mode = LOCK_MODE_SHARED;
2001  if (!m_bdb_fs->node_has_shared_lock_handles(txn, node))
2002  lock_notify = true;
2003  }
2004  else if ((flags & OPEN_FLAG_LOCK_EXCLUSIVE) == OPEN_FLAG_LOCK_EXCLUSIVE) {
2005  if (cur_lock_mode == LOCK_MODE_SHARED || cur_lock_mode == LOCK_MODE_EXCLUSIVE) {
2007  return;
2008  }
2009  lock_mode = LOCK_MODE_EXCLUSIVE;
2010  lock_notify = true;
2011  }
2012  }
2013  } // node exists in DB already
2014  else { // node doesn't exist in DB
2015  if (!(flags & OPEN_FLAG_CREATE)) {
2017  return;
2018  }
2019  // create new node
2020  lock_generation = 1;
2021  m_bdb_fs->create(txn, name, (flags & OPEN_FLAG_TEMP) > 0);
2022  m_bdb_fs->set_xattr_i64(txn, name, "lock.generation",
2023  lock_generation);
2024 
2025  // create a new node data object in hyperspace
2026  m_bdb_fs->create_node(txn, node, (flags & OPEN_FLAG_TEMP) > 0, lock_generation);
2027 
2028  // Set the initial attributes
2029  for (size_t i=0; i<init_attrs.size(); i++)
2030  m_bdb_fs->set_xattr(txn, name, init_attrs[i].name,
2031  init_attrs[i].value, init_attrs[i].value_len);
2032  created = true;
2033  } // node doesn't exist in DB
2034  handle = m_bdb_fs->get_next_id_i64(txn, HANDLE, true);
2035  m_bdb_fs->create_handle(txn, handle, node, flags, event_mask, ctx.session_id, false,
2036  HANDLE_NOT_DEL);
2037  m_bdb_fs->add_session_handle(txn, ctx.session_id, handle);
2038 
2039  // create node added event and persist notifications
2040  create_event(ctx, parent_node, EVENT_MASK_CHILD_NODE_ADDED, child_name);
2041 
2042  /*
2043  * If open flags LOCK_SHARED or LOCK_EXCLUSIVE, then obtain lock
2044  */
2045  if (lock_mode != 0) {
2046  lock_generation = m_bdb_fs->incr_node_lock_generation(txn, node);
2047  m_bdb_fs->set_xattr_i64(txn, name, "lock.generation",
2048  lock_generation);
2049 
2050  m_bdb_fs->set_node_cur_lock_mode(txn, node, lock_mode);
2051  lock_handle(txn, handle, lock_mode, node);
2052 
2053  // create and persist lock acquired event
2054  // deliver notification to handles to this same node
2055  if (lock_notify) {
2056  uint64_t lock_acquired_event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
2057 
2058  m_bdb_fs->create_event(txn, EVENT_TYPE_LOCK_ACQUIRED, lock_acquired_event_id,
2059  EVENT_MASK_LOCK_ACQUIRED, lock_mode);
2060 
2061  std::vector<EventContext>::iterator it = ctx.evts.insert(ctx.evts.end(),
2062  EventContext(make_shared<EventLockAcquired>(lock_acquired_event_id, lock_mode)));
2063 
2065  it->notifications)) {
2066  persist_event_notifications(txn, lock_acquired_event_id,
2067  it->notifications);
2068  it->persisted_notifications = true;
2069  }
2070  }
2071  }
2072 
2073  m_bdb_fs->add_node_handle(txn, node, handle);
2074 
2075  HT_INFOF("handle %llu created ('%s', session=%llu(%s), flags=0x%x, mask=0x%x)",
2076  (Llu)handle, node.c_str(), (Llu)ctx.session_id, ctx.session_data->get_name(),
2077  flags, event_mask);
2078 }
2079 
2080 void Hyperspace::Master::unlink(CommandContext &ctx, const char *name) {
2081  if (m_verbose) {
2082  HT_INFOF("%s(session_id=%llu, name=%s)", ctx.friendly_name, (Llu)ctx.session_id, name);
2083  }
2084 
2085  if (!strcmp(name, "/")) {
2087  "Cannot remove '/' directory");
2088  return;
2089  }
2090 
2091  if (!ctx.session_data) {
2092  if (!get_session(ctx.session_id, ctx.session_data)) {
2093  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
2094  return;
2095  }
2096  }
2097 
2098  String node=name;
2099  String child_name, parent_node;
2100  if (!find_parent_node(node, parent_node, child_name)) {
2102  return;
2103  }
2104 
2105  HT_ASSERT(ctx.txn);
2106  BDbTxn &txn = *ctx.txn;
2107 
2108  // make sure parent node data is setup
2109  if (!validate_and_create_node_data(txn, parent_node)) {
2110  ctx.set_error(Error::HYPERSPACE_FILE_NOT_FOUND, (String)" node: '" + node);
2111  return;
2112  }
2113 
2114  // make sure file & node data exist
2115  if (!validate_and_create_node_data(txn, node)) {
2117  return;
2118  }
2119 
2120  bool has_refs = m_bdb_fs->node_has_open_handles(txn, node);
2121  if (has_refs) {
2122  ctx.set_error(Error::HYPERSPACE_FILE_OPEN, "File is still open and referred to by some handle");
2123  return;
2124  }
2125 
2126  // Sanity check
2127  HT_ASSERT(name[0] == '/' && name[strlen(name)-1] != '/');
2128 
2129  // Create event and persist notifications
2130  create_event(ctx, parent_node, EVENT_MASK_CHILD_NODE_REMOVED, child_name);
2131 
2132  // Delete node
2133  m_bdb_fs->unlink(txn, name);
2134 
2135  // Delete node data
2136  m_bdb_fs->delete_node(txn, node);
2137 }
2138 
2140  const char *name, const std::vector<Attribute> &attrs) {
2141  HT_ASSERT(ctx.txn);
2142  BDbTxn &txn = *ctx.txn;
2143 
2144  std::string attr_names;
2145  size_t total_value_len = 0;
2146  if (m_verbose) {
2147  for (const auto &attr : attrs) {
2148  attr_names += attr.name;
2149  attr_names += ",";
2150  total_value_len += attr.value_len;
2151  }
2152  boost::trim_right_if(attr_names, boost::is_any_of(","));
2153  }
2154 
2155  String node;
2156  if (name && *name) {
2157  if (!get_named_node(ctx, name, attr_names.c_str(), node))
2158  return;
2159  }
2160  else
2161  if (!get_handle_node(ctx, handle, attr_names.c_str(), node))
2162  return;
2163 
2164  for (const auto &attr : attrs) {
2165  m_bdb_fs->set_xattr(txn, node, attr.name, attr.value, attr.value_len);
2166  // create event notification and persist
2167  create_event(ctx, node, EVENT_MASK_ATTR_SET, attr.name);
2168  }
2169 
2170  if (m_verbose) {
2171  HT_INFOF("exitting attrset(session=%llu(%s), handle=%llu, name=%s, value_len=%d)",
2172  (Llu)ctx.session_id, ctx.session_data->get_name(), (Llu)handle, attr_names.c_str(), (int)total_value_len);
2173  }
2174 }
2175 
2177  const char *name, const char *attr, DynamicBuffer &dbuf) {
2178  HT_ASSERT(ctx.txn);
2179  BDbTxn &txn = *ctx.txn;
2180 
2181  String node;
2182  if (name && *name) {
2183  if (!get_named_node(ctx, name, attr, node))
2184  return;
2185  }
2186  else
2187  if (!get_handle_node(ctx, handle, attr, node))
2188  return;
2189 
2190  if (!m_bdb_fs->get_xattr(txn, node, attr, dbuf)) {
2192  return;
2193  }
2194 }
2195 
2197  const char *name, const std::vector<String> &attrs,
2198  std::vector<DynamicBufferPtr> &dbufs) {
2199  dbufs.clear();
2200  HT_ASSERT(ctx.txn);
2201  BDbTxn &txn = *ctx.txn;
2202 
2203  String node;
2204  if (name && *name) {
2205  if (!get_named_node(ctx, name, 0, node))
2206  return;
2207  }
2208  else
2209  if (!get_handle_node(ctx, handle, 0, node))
2210  return;
2211 
2212  dbufs.reserve(attrs.size());
2213  for (const auto &attr : attrs) {
2214  dbufs.push_back(make_shared<DynamicBuffer>());
2215  if (!m_bdb_fs->get_xattr(txn, node, attr, *dbufs.back()))
2216  dbufs.back() = 0; // attr not found
2217  }
2218 }
2219 
2221  const char *name, const char* attr, uint64_t& attr_val) {
2222  HT_ASSERT(ctx.txn);
2223  BDbTxn &txn = *ctx.txn;
2224 
2225  String node;
2226  if (name && *name) {
2227  if (!get_named_node(ctx, name, attr, node))
2228  return;
2229  }
2230  else
2231  if (!get_handle_node(ctx, handle, attr, node))
2232  return;
2233 
2234  if (!m_bdb_fs->incr_attr(txn, node, attr, &attr_val)) {
2236  return;
2237  }
2238 }
2239 
2240 void Hyperspace::Master::attr_del(CommandContext &ctx, uint64_t handle, const char *name) {
2241  HT_ASSERT(ctx.txn);
2242  BDbTxn &txn = *ctx.txn;
2243 
2244  String node;
2245  if (!get_handle_node(ctx, handle, name, node))
2246  return;
2247  m_bdb_fs->del_xattr(txn, node, name);
2248 
2249  // create event notification and persist
2250  create_event(ctx, node, EVENT_MASK_ATTR_DEL, name);
2251 }
2252 
2254  const char *name, const char *attr, bool& exists) {
2255  exists = false;
2256 
2257  HT_ASSERT(ctx.txn);
2258  BDbTxn &txn = *ctx.txn;
2259 
2260  String node;
2261  if (name && *name) {
2262  if (!get_named_node(ctx, name, attr, node))
2263  return;
2264  }
2265  else
2266  if (!get_handle_node(ctx, handle, attr, node))
2267  return;
2268 
2269  if (m_bdb_fs->exists_xattr(txn, node, attr))
2270  exists = true;
2271 }
2272 
2273 void Hyperspace::Master::attr_list(CommandContext& ctx, uint64_t handle, std::vector<String>& attributes) {
2274  attributes.clear();
2275 
2276  HT_ASSERT(ctx.txn);
2277  BDbTxn &txn = *ctx.txn;
2278 
2279  String node;
2280  if (!get_handle_node(ctx, handle, 0, node))
2281  return;
2282 
2283  if (!m_bdb_fs->list_xattr(txn, node, attributes)) {
2285  format("handle=%lld node=%s", (Lld)handle, node.c_str()));
2286  return;
2287  }
2288 }
2289 
2290 void Hyperspace::Master::exists(CommandContext& ctx, const char *name, bool& file_exists) {
2291  file_exists = false;
2292 
2293  HT_ASSERT(ctx.txn);
2294  BDbTxn &txn = *ctx.txn;
2295 
2296  if (m_verbose)
2297  HT_INFOF("exists(session_id=%llu, name=%s)", (Llu)ctx.session_id, name);
2298 
2299  HT_ASSERT(name[0] == '/' && (name[1] == '\0' || name[strlen(name)-1] != '/'));
2300  file_exists = m_bdb_fs->exists(txn, name);
2301 
2302  if (m_verbose)
2303  HT_INFOF("exitting exists(session_id=%llu, name=%s)", (Llu)ctx.session_id, name);
2304 }
2305 
2306 void Hyperspace::Master::readdir(CommandContext& ctx, uint64_t handle, std::vector<DirEntry>& listing) {
2307  listing.clear();
2308 
2309  HT_ASSERT(ctx.txn);
2310  BDbTxn &txn = *ctx.txn;
2311 
2312  String node;
2313  if (!get_handle_node(ctx, handle, 0, node))
2314  return;
2315 
2316  m_bdb_fs->get_directory_listing(txn, node, listing);
2317 }
2318 
2319 void Hyperspace::Master::readdir_attr(CommandContext& ctx, uint64_t handle, const char *name, const char *attr,
2320  bool include_sub_entries, std::vector<DirEntryAttr>& listing) {
2321  listing.clear();
2322 
2323  HT_ASSERT(ctx.txn);
2324  BDbTxn &txn = *ctx.txn;
2325 
2326  String node;
2327  if (name && *name) {
2328  if (!get_named_node(ctx, name, attr, node))
2329  return;
2330  }
2331  else
2332  if (!get_handle_node(ctx, handle, attr, node))
2333  return;
2334 
2335  m_bdb_fs->get_directory_attr_listing(txn, node, attr, include_sub_entries, listing);
2336 }
2337 
2338 void Hyperspace::Master::readpath_attr(CommandContext& ctx, uint64_t handle, const char *name, const char *attr,
2339  std::vector<DirEntryAttr>& listing) {
2340  listing.clear();
2341 
2342  HT_ASSERT(ctx.txn);
2343  BDbTxn &txn = *ctx.txn;
2344 
2345  String node;
2346  bool node_is_dir;
2347  if (name && *name) {
2348  if (!get_named_node(ctx, name, attr, node, &node_is_dir))
2349  return;
2350  }
2351  else {
2352  if (!get_handle_node(ctx, handle, attr, node))
2353  return;
2354  m_bdb_fs->exists(txn, node, &node_is_dir);
2355  }
2356 
2357  size_t pos = 0;
2358  String path_component;
2359  DynamicBuffer attr_buf;
2360  DirEntryAttr entry;
2361 
2362  // iterate over all path components and get attribute value if present
2363  while (pos != string::npos) {
2364  pos = node.find('/', pos);
2365  entry.is_dir = true;
2366 
2367  if (pos == string::npos)
2368  entry.is_dir = node_is_dir;
2369  else
2370  pos++;
2371 
2372  path_component = node.substr(0, pos);
2373  entry.name = path_component;
2374 
2375  // insert entry to result list if it has the attribute
2376  if (m_bdb_fs->get_xattr(txn, path_component, attr, attr_buf)) {
2377  entry.attr = attr_buf;
2378  entry.has_attr = true;
2379  }
2380  else {
2381  entry.attr.free();
2382  entry.has_attr = false;
2383  }
2384  listing.push_back(entry);
2385  }
2386 }
2387 
2388 /*
2389  * Validates the session and returns the node for the handle specified
2390  */
2391 bool Hyperspace::Master::get_handle_node(CommandContext &ctx, uint64_t handle, const char* attr, String &node) {
2392  if (!ctx.session_data) {
2393  if (!get_session(ctx.session_id, ctx.session_data)) {
2394  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
2395  return false;
2396  }
2397  }
2398 
2399  if (m_verbose) {
2400  if (attr && *attr)
2401  HT_INFOF("%s(session=%llu(%s), handle=%llu, attr=%s)", ctx.friendly_name,
2402  (Llu)ctx.session_id, ctx.session_data->get_name(), (Llu)handle, attr);
2403  else
2404  HT_INFOF("%s(session=%llu(%s), handle=%llu)", ctx.friendly_name,
2405  (Llu)ctx.session_id, ctx.session_data->get_name(), (Llu)handle);
2406  }
2407 
2408  HT_ASSERT(ctx.txn);
2409  BDbTxn &txn = *ctx.txn;
2410 
2411  // make sure session is still valid
2412  if (!m_bdb_fs->session_exists(txn, ctx.session_id)) {
2413  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
2414  return false;
2415  }
2416 
2417  if (!m_bdb_fs->handle_exists(txn, handle)) {
2419  format("Session %llu, handle=%llu", (Llu)ctx.session_id, (Llu)handle));
2420  return false;
2421  }
2422 
2423  m_bdb_fs->get_handle_node(txn, handle, node);
2424  return true;
2425 }
2426 
2427 /*
2428  * Validates the session and returns the node for the name specified
2429  */
2430 bool Hyperspace::Master::get_named_node(CommandContext &ctx, const char *name, const char* attr, String &node, bool *is_dir) {
2431  if (!ctx.session_data) {
2432  if (!get_session(ctx.session_id, ctx.session_data)) {
2433  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
2434  return false;
2435  }
2436  }
2437 
2438  if (m_verbose) {
2439  if (attr && *attr)
2440  HT_INFOF("%s(session=%llu(%s), name=%s, attr=%s)", ctx.friendly_name,
2441  (Llu)ctx.session_id, ctx.session_data->get_name(), name, attr);
2442  else
2443  HT_INFOF("%s(session=%llu(%s), name=%s)", ctx.friendly_name,
2444  (Llu)ctx.session_id, ctx.session_data->get_name(), name);
2445  }
2446 
2447  HT_ASSERT(ctx.txn);
2448  BDbTxn &txn = *ctx.txn;
2449 
2450  node = name;
2451  String child_name, parent_node;
2452  if (!find_parent_node(node, parent_node, child_name)) {
2454  return false;
2455  }
2456  boost::trim_right_if(node, boost::is_any_of("/"));
2457 
2458  // make sure node exists
2459  if (!m_bdb_fs->exists(txn, node, is_dir)) {
2460  ctx.set_error(Error::HYPERSPACE_FILE_NOT_FOUND, (String)"node: '" + name + "'");
2461  return false;
2462  }
2463 
2464  // make sure session is still valid
2465  if (!m_bdb_fs->session_exists(txn, ctx.session_id)) {
2466  ctx.set_error(Error::HYPERSPACE_EXPIRED_SESSION, format("Session %llu", (Llu)ctx.session_id));
2467  return false;
2468  }
2469  return true;
2470 }
2471 
2472 /*
2473  */
2474 void Hyperspace::Master::create_event(CommandContext &ctx, const String &node, uint32_t event_mask, const String &name) {
2475  HT_ASSERT(ctx.txn);
2476  BDbTxn &txn = *ctx.txn;
2477 
2478  uint64_t event_id = m_bdb_fs->get_next_id_i64(txn, EVENT, true);
2479  m_bdb_fs->create_event(txn, EVENT_TYPE_NAMED, event_id, event_mask, name);
2480 
2481  std::vector<EventContext>::iterator it = ctx.evts.insert(ctx.evts.end(),
2482  EventContext(make_shared<EventNamed>(event_id, event_mask, name)));
2483 
2484  if (m_bdb_fs->get_node_event_notification_map(txn, node, event_mask,
2485  it->notifications)) {
2486  persist_event_notifications(txn, event_id, it->notifications);
2487  it->persisted_notifications = true;
2488  }
2489  else
2490  it->persisted_notifications = false;
2491 }
2492 
2493 /*
2494  */
2496  for (auto &evt : ctx.evts)
2497  if (evt.persisted_notifications)
2498  deliver_event_notifications(evt, wait_for_notify);
2499 }
2500 
2502  deliver_event_notifications(evt.event, evt.notifications, wait_for_notify);
2503 }
2504 
2505 /*
2506  */
2508 
2509  HT_BDBTXN_BEGIN() {
2510  if (!m_bdb_fs->get_xattr_i32(txn, "/hyperspace/metadata", "generation",
2511  &m_generation))
2512  m_generation = 0;
2513  m_generation++;
2514  m_bdb_fs->set_xattr_i32(txn, "/hyperspace/metadata", "generation",
2515  m_generation);
2516  txn.commit();
2517  }
2518  HT_BDBTXN_END(BOOST_PP_EMPTY());
2519 }
2520 
2521 /*
2522  * If node exists but corresponding node data doesn't then
2523  * create the node data
2524  */
2525 bool
2527 {
2528  // make sure node is exists
2529  if (!m_bdb_fs->exists(txn, node)) {
2530  return false;
2531  }
2532 
2533  // create node data for this node and set its lock generation
2534  if (!m_bdb_fs->node_exists(txn, node)) {
2535  uint64_t lock_generation;
2536  if (!m_bdb_fs->get_xattr_i64(txn, node, "lock.generation", &lock_generation)) {
2537  lock_generation = 1;
2538  m_bdb_fs->set_xattr_i64(txn, node, "lock.generation", lock_generation);
2539  }
2540  m_bdb_fs->create_node(txn, node, false, lock_generation);
2541  }
2542 
2543  return true;
2544 }
Session identifier.
uint32_t get_handle_del_state(BDbTxn &txn, uint64_t id)
#define HT_BDBTXN_END(...)
Definition: Master.cc:101
bool validate_and_create_node_data(BDbTxn &txn, const String &node)
Definition: Master.cc:2526
bool next_expired_session(SessionDataPtr &, std::chrono::steady_clock::time_point now)
Definition: Master.cc:397
void close(ResponseCallback *cb, uint64_t session_id, uint64_t handle)
Definition: Master.cc:686
uint64_t handle
Node handle ID.
Lock successfully granted.
Definition: LockSequencer.h:58
std::chrono::steady_clock::time_point m_last_tick
Definition: Master.h:347
void set_session_name(BDbTxn &txn, uint64_t id, const String &name)
void attr_set(ResponseCallback *cb, uint64_t session_id, uint64_t handle, const char *name, uint32_t oflags, const std::vector< Attribute > &attrs)
Definition: Master.cc:758
SessionMap m_session_map
Definition: Master.h:340
void add_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &request)
Adds a lock request.
bool node_has_shared_lock_handles(BDbTxn &txn, const String &name)
bool is_dir
Definition: DirEntryAttr.h:66
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Compatibility class for boost::filesystem::path.
void delete_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle_id)
bool get_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t *valuep)
void release_lock(BDbTxn &txn, uint64_t handle, const String &node, HyperspaceEventPtr &release_event, NotificationMap &release_notifications)
Definition: Master.cc:1509
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
void lock_handle(BDbTxn &txn, uint64_t handle, uint32_t mode, String &node)
Definition: Master.cc:1379
void status(ResponseCallbackStatus *cb)
Definition: Master.cc:1194
virtual int response_ok()
Sends a a simple success response back to the client which is just the 4-byte error code Error::OK...
StaticBuffer attr
Definition: DirEntryAttr.h:63
void get_handle_node(BDbTxn &txn, uint64_t id, String &node_name)
void add_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
Error if create and file exists.
Definition: Session.h:79
Manages transaction state.
bool incr_attr(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
void unlink(ResponseCallback *cb, uint64_t session_id, const char *name)
Definition: Master.cc:600
Exclusive lock attempt failed because another has it locked.
Definition: LockSequencer.h:60
struct sockaddr_in m_local_addr
Definition: Master.h:338
void create_handle(BDbTxn &txn, uint64_t id, String node_name, uint32_t open_flags, uint32_t event_mask, uint64_t session_id, bool locked, uint32_t del_state)
void create_node(BDbTxn &txn, const String &name, bool ephemeral=false, uint64_t lock_generation=0, uint32_t cur_lock_mode=0, uint64_t exclusive_handle=0)
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
std::string m_base_dir
Definition: Master.h:331
bool m_maintenance_outstanding
Definition: Master.h:346
void open(ResponseCallbackOpen *cb, uint64_t session_id, const char *name, uint32_t flags, uint32_t event_mask, std::vector< Attribute > &init_attrs)
Definition: Master.cc:638
void create(BDbTxn &txn, const String &fname, bool temp)
bool get_session(uint64_t session_id, SessionDataPtr &session_data)
Definition: Master.cc:291
std::shared_ptr< SessionData > SessionDataPtr
Definition: SessionData.h:156
int response(uint32_t status, uint64_t lock_generation=0)
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
SessionDataVec m_session_heap
Definition: Master.h:339
Lock in shared mode.
Definition: LockSequencer.h:49
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
void delete_session(BDbTxn &txn, uint64_t id)
static bool mkdirs(const String &dirname)
Creates a directory (with all parent directories, if required)
Definition: FileUtils.cc:366
void initialize_session(uint64_t session_id, const String &name)
Definition: Master.cc:323
bool node_is_ephemeral(BDbTxn &txn, const String &name)
std::shared_ptr< Event > HyperspaceEventPtr
Definition: Event.h:145
void set_node_cur_lock_mode(BDbTxn &txn, const String &name, uint32_t lock_mode)
std::vector< EventContext > evts
Definition: Master.h:217
int response(std::vector< DirEntry > &listing)
void attr_incr(ResponseCallbackAttrIncr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
Definition: Master.cc:883
bool node_has_pending_lock_request(BDbTxn &txn, const String &name)
Atomically open and lock file shared, fail if can't.
Definition: Session.h:83
void add_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle)
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
void attr_del(ResponseCallback *cb, uint64_t session_id, uint64_t handle, const char *name)
Definition: Master.cc:922
void remove_expired_sessions()
Definition: Master.cc:423
std::mutex m_session_map_mutex
Definition: Master.h:343
int response(std::vector< DirEntryAttr > &listing)
void get_session_handles(BDbTxn &txn, uint64_t id, std::vector< uint64_t > &handles)
Hyperspace definitions
int response(const vector< string > &attributes)
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void destroy_session(uint64_t session_id)
Definition: Master.cc:305
Open file for locking.
Definition: Session.h:75
void grant_pending_lock_reqs(BDbTxn &txn, const String &node, HyperspaceEventPtr &lock_granted_event, NotificationMap &lock_granted_notifications, HyperspaceEventPtr &lock_acquired_event, NotificationMap &lock_acquired_notifications)
Definition: Master.cc:1554
void readdir(ResponseCallbackReaddir *cb, uint64_t session_id, uint64_t handle)
Definition: Master.cc:1054
Definition: DirEntryAttr.h:40
bool exists(BDbTxn &txn, String fname, bool *is_dir_p=0)
bool is_master()
Definition: Master.h:78
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
void shutdown(ResponseCallback *cb, uint64_t session_id)
Definition: Master.cc:1160
File system utility functions.
void handle_wakeup()
Handle wakeup event (e.g.
Definition: Master.cc:1835
std::unordered_map< uint64_t, uint64_t > NotificationMap
Hash map from Node handle ID to Session ID.
Open file for writing.
Definition: Session.h:73
Master(ConnectionManagerPtr &, PropertiesPtr &, ServerKeepaliveHandlerPtr &, ApplicationQueuePtr &app_queue_ptr)
Definition: Master.cc:133
std::string name
Definition: DirEntryAttr.h:62
bool node_has_open_handles(BDbTxn &txn, const String &name)
std::string m_lock_file
Definition: Master.h:332
bool has_attr
Boolean value indicating whether or not this entry is a directory.
Definition: DirEntryAttr.h:65
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
void do_checkpoint()
Checkpoints the BerkeleyDB database.
bool handle_is_locked(BDbTxn &txn, uint64_t id)
bool delete_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
int response(const Hypertable::Status &status)
bool delete_node(BDbTxn &txn, const String &name)
NotificationMap notifications
Definition: Master.h:196
Sends back result of an attr_exists request.
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
bool get_node_event_notification_map(BDbTxn &txn, const String &name, uint32_t event_mask, NotificationMap &handles_to_sessions)
bool get_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &front_req)
Check if a node has any pending lock requests from non-expired handles.
void readpath_attr(ResponseCallbackReadpathAttr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
Definition: Master.cc:1129
uint64_t get_node_exclusive_lock_handle(BDbTxn &txn, const String &name)
void set_handle_locked(BDbTxn &txn, uint64_t id, bool locked)
uint32_t m_maintenance_interval
Definition: Master.h:330
Compatibility Macros for C/C++.
bool get_xattr(BDbTxn &txn, const String &fname, const String &aname, Hypertable::DynamicBuffer &vbuf)
void delete_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
Status m_status
Program status tracker.
Definition: Master.h:358
std::shared_ptr< ServerKeepaliveHandler > ServerKeepaliveHandlerPtr
#define HT_BDBTXN_BEGIN(parent_txn)
Definition: Master.cc:66
void set_handle_del_state(BDbTxn &txn, uint64_t id, uint32_t del_state)
Sends back result of an attr_get request.
#define HT_END
Definition: Logger.h:220
Importing boost::thread and boost::thread_group into the Hypertable namespace.
#define HT_ERROR_OUT
Definition: Logger.h:301
void free()
Clears the data; if this object is owner of the data then the allocated buffer is delete[]d...
Definition: StaticBuffer.h:185
bool exists_xattr(BDbTxn &txn, const String &fname, const String &aname)
BerkeleyDbFilesystem * m_bdb_fs
Definition: Master.h:355
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
void create_event(BDbTxn &txn, uint32_t type, uint64_t id, uint32_t mask)
void mkdir(BDbTxn &txn, const String &name)
Hyperspace filesystem implementation on top of BerkeleyDB.
This class is used to generate and deliver standard responses back to a client.
void get_generation_number()
Definition: Master.cc:2507
void add_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
void do_maintenance()
Definition: Master.cc:1856
uint32_t get_node_cur_lock_mode(BDbTxn &txn, const String &name)
Hypertable definitions
Lock exclusive mode.
Definition: LockSequencer.h:51
Handle identifier.
std::chrono::steady_clock::time_point m_sleep_time
Suspension time recorded by handle_sleep()
Definition: Master.h:350
uint32_t m_generation
Definition: Master.h:334
void set_xattr(BDbTxn &txn, const String &fname, const String &aname, const void *value, size_t value_len)
long long int Lld
Shortcut for printf formats.
Definition: String.h:53
void expire_session(BDbTxn &txn, uint64_t id)
static bool initialize(sockaddr_in *addr, const char *host, uint16_t port)
Initialize a sockaddr_in structure from host:port.
Definition: InetAddr.cc:68
bool handle_exists(BDbTxn &txn, uint64_t id)
void attr_exists(ResponseCallbackAttrExists *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr)
Definition: Master.cc:956
void deliver_event_notifications(CommandContext &ctx, bool wait_for_notify=true)
Definition: Master.cc:2495
HyperspaceEventPtr event
Definition: Master.h:195
void mkdir(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector< Attribute > &init_attrs)
Definition: Master.cc:484
void attr_list(ResponseCallbackAttrList *cb, uint64_t session_id, uint64_t handle)
Definition: Master.cc:983
void attr_get(ResponseCallbackAttrGet *cb, uint64_t session_id, uint64_t handle, const char *name, const std::vector< String > &attrs)
Definition: Master.cc:831
virtual int error(int error, const String &msg)
Sends a standard error response back to the client.
void delete_node_pending_lock_request(BDbTxn &txn, const String &name, uint64_t handle)
void lock(ResponseCallbackLock *cb, uint64_t session_id, uint64_t handle, uint32_t mode, bool try_lock)
Definition: Master.cc:1203
std::mutex m_maintenance_mutex
Definition: Master.h:345
bool find_parent_node(const std::string &normal_name, std::string &parent_name, std::string &child_name)
Definition: Master.cc:1695
Lock attempt pending (internal use only)
Definition: LockSequencer.h:62
#define HT_INFOF(msg,...)
Definition: Logger.h:272
uint64_t create_session(struct sockaddr_in &addr)
Definition: Master.cc:263
void readdir_attr(ResponseCallbackReaddirAttr *cb, uint64_t session_id, uint64_t handle, const char *name, const char *attr, bool include_sub_entries)
Definition: Master.cc:1090
int renew_session_lease(uint64_t session_id)
Definition: Master.cc:354
atomically open and lock file exclusive, fail if can't
Definition: Session.h:85
int response(uint64_t handle, bool created, uint64_t lock_generation)
Random number generator for int32, int64, double and ascii arrays.
uint64_t get_handle_session(BDbTxn &txn, uint64_t id)
void set_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t value)
bool get_named_node(CommandContext &ctx, const char *name, const char *attr, String &node, bool *is_dir=0)
Definition: Master.cc:2430
uint64_t get_next_id_i64(BDbTxn &txn, IdentifierType id_type, bool increment=false)
Encapsulates a lock request for a file node.
uint32_t m_lease_interval
Definition: Master.h:328
uint32_t m_keep_alive_interval
Definition: Master.h:329
bool list_xattr(BDbTxn &txn, const String &fname, std::vector< String > &anames)
uint32_t mode
Lock mode.
bool session_exists(BDbTxn &txn, uint64_t id)
void reset(BDbTxn *_txn)
Definition: Master.h:244
Used in conjunction with CREATE to create an ephemeral file.
Definition: Session.h:81
void create_event(CommandContext &ctx, const String &node, uint32_t event_mask, const String &name)
Definition: Master.cc:2474
long unsigned int Lu
Shortcut for printf formats.
Definition: String.h:47
void set_event_notification_handles(BDbTxn &txn, uint64_t id, const std::vector< uint64_t > &handles)
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void create_session(BDbTxn &txn, uint64_t id, const String &addr)
#define HT_BDBTXN_END_CB(_cb_)
Definition: Master.cc:74
Create file if it does not exist.
Definition: Session.h:77
int response(bool exists)
Sends back result of an attr_exists request.
Event identifier.
void set_error(int _error, const char *_error_msg, bool abort=true)
Definition: Master.h:226
bool get_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
void release(ResponseCallback *cb, uint64_t session_id, uint64_t handle)
Definition: Master.cc:1416
void unlink(BDbTxn &txn, const String &name)
ServerKeepaliveHandlerPtr m_keepalive_handler_ptr
Definition: Master.h:337
void delete_handle(BDbTxn &txn, uint64_t id)
int response(std::vector< DirEntryAttr > &listing)
std::shared_ptr< ConnectionManager > ConnectionManagerPtr
Smart pointer to ConnectionManager.
void del_xattr(BDbTxn &txn, const String &fname, const String &aname)
bool node_exists(BDbTxn &txn, const String &name)
std::shared_ptr< ApplicationQueue > ApplicationQueuePtr
Shared smart pointer to ApplicationQueue object.
MetricsHandlerPtr m_metrics_handler
Definition: Master.h:341
System information and statistics based on libsigar.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
int response(const std::vector< Hypertable::DynamicBufferPtr > &buffers)
Sends back result of an attr_get request.
void get_directory_listing(BDbTxn &txn, String fname, std::vector< DirEntry > &listing)
void exists(ResponseCallbackExists *cb, uint64_t session_id, const char *name)
Definition: Master.cc:1017
uint32_t get_handle_open_flags(BDbTxn &txn, uint64_t id)
bool get_handle_node(CommandContext &ctx, uint64_t handle, const char *attr, String &node)
Definition: Master.cc:2391
void handle_sleep()
Handle sleep event (e.g.
Definition: Master.cc:1829
bool destroy_handle(uint64_t handle, int &error, String &errmsg, bool wait_for_notify=true)
Definition: Master.cc:1734
#define HT_DEBUG_OUT
Definition: Logger.h:261
void mkdirs(ResponseCallback *cb, uint64_t session_id, const char *name, const std::vector< Attribute > &init_attrs)
Definition: Master.cc:524
uint64_t incr_node_lock_generation(BDbTxn &txn, const String &name)
void get_directory_attr_listing(BDbTxn &txn, String fname, const String &aname, bool include_sub_entries, std::vector< DirEntryAttr > &listing)
void set_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t value)
void persist_event_notifications(BDbTxn &txn, uint64_t event_id, NotificationMap &handles_to_sessions)
Definition: Master.cc:1621
void set_node_exclusive_lock_handle(BDbTxn &txn, const String &name, uint64_t exclusive_lock_handle)