0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
BerkeleyDbFilesystem.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 #include <Hyperspace/DbtManaged.h>
26 
27 #include <Common/Logger.h>
28 #include <Common/Path.h>
29 #include <Common/ScopeGuard.h>
30 #include <Common/Serialization.h>
31 #include <Common/String.h>
32 #include <Common/System.h>
33 #include <Common/SystemInfo.h>
34 #include <Common/Time.h>
35 
36 #include <boost/algorithm/string.hpp>
37 #include <boost/filesystem.hpp>
38 
39 #include <cctype>
40 #include <cstdlib>
41 #include <ostream>
42 #include <sstream>
43 #include <vector>
44 
45 using namespace boost::algorithm;
46 using namespace Hyperspace;
47 using namespace Hypertable;
48 using namespace Error;
49 using namespace StateDbKeys;
50 
51 #define HT_DEBUG_ATTR(_txn_, _fn_, _an_, _k_, _v_) \
52  HT_DEBUG_OUT <<"txn="<< (_txn_) <<" fname='"<< (_fn_) <<"' attr='"<< (_an_) \
53  <<"' key='"<< (char *)(_k_).get_data() <<"' value='"<< (_v_) <<"'" \
54  << HT_END
55 
56 #define HT_DEBUG_ATTR_(_txn_, _fn_, _an_, _k_, _v_, _l_) \
57  HT_DEBUG_OUT <<"txn="<< (_txn_) <<" fname='"<< (_fn_); \
58  if ((_an_) == String()) _out_ <<"' attr='"<< (_an_); \
59  _out_ <<"' key='" << (char *)(_k_).get_data(); \
60  if (_l_) _out_ <<"' value='"<< format_bytes(20, _v_, _l_); \
61  _out_ <<"'"<< HT_END
62 
63 void close_db_cursor(Dbc **cursor) {
64  if (*cursor != 0) {
65  (*cursor)->close();
66  *cursor = 0;
67  }
68 }
69 
70 const char* BerkeleyDbFilesystem::ms_name_namespace_db = "namespace.db";
71 const char* BerkeleyDbFilesystem::ms_name_state_db = "state.db";
72 
73 BerkeleyDbFilesystem::BerkeleyDbFilesystem(PropertiesPtr &props,
74  const std::string &basedir,
75  const std::vector<Thread::id> &thread_ids,
76  bool force_recover)
77  : m_base_dir(basedir), m_env(0) {
78 
79  m_checkpoint_size_kb = props->get_i32("Hyperspace.Checkpoint.Size") / 1000;
80  m_max_unused_logs = props->get_i32("Hyperspace.LogGc.MaxUnusedLogs");
81  m_log_gc_interval = std::chrono::milliseconds(props->get_i32("Hyperspace.LogGc.Interval"));
82  m_last_log_gc_time = std::chrono::steady_clock::now();
83 
84  u_int32_t env_flags =
85  DB_CREATE | // If the environment does not exist, create it
86  DB_INIT_LOCK | // Initialize locking
87  DB_INIT_LOG | // Initialize logging
88  DB_INIT_MPOOL | // Initialize the cache
89  DB_INIT_TXN | // Initialize transactions
90  DB_INIT_REP | // Initialize replication
91  DB_RECOVER | // Do basic recovery
92  DB_THREAD;
93 
94  if (force_recover)
95  env_flags |= DB_RECOVER_FATAL;
96 
97  m_db_flags = DB_CREATE | DB_AUTO_COMMIT | DB_THREAD;
98 
99  /*
100  * Open Berkeley DB environment and namespace database
101  */
102  try {
103  int ret;
104  Dbt key, data;
105  DbtManaged keym, datam;
106  char numbuf[17];
107  String localhost = System::net_info().host_name;
109  HT_INFOF("localhost=%s localip=%s", localhost.c_str(), localip.c_str());
110 
111  m_env.set_lk_detect(DB_LOCK_DEFAULT);
112  m_env.set_app_private(&m_replication_info);
113  m_env.set_event_notify(db_event_callback);
114  m_env.set_msgcall(db_msg_callback);
115  m_env.set_errcall(db_err_callback);
116  m_env.set_verbose(DB_VERB_REPLICATION, 1);
117  m_env.open(m_base_dir.c_str(), env_flags, 0);
118  {
119  // Haven't implemented state recovery yet, so delete the old statedb and create the new one
120  String state_db = m_base_dir + "/" + ms_name_state_db;
121  HT_DEBUG_OUT << "Check for & remove for existing statedb=" << state_db << HT_END;
122  if (FileUtils::exists(state_db)) {
123  HT_INFO("Removing statedb");
124  Db old_state_db(&m_env, 0);
125  old_state_db.remove(ms_name_state_db, NULL, 0);
126  }
127  }
128 
129  /*
130  * Setup replication
131  * TODO: add condition to replication info object to keep track of whether local
132  * replica is a master or not. If it is the master then setup changes that the master
133  * environment needs to make.
134  */
135  if (props->has("Hyperspace.Replica.Host"))
136  m_replication_info.num_replicas = props->get_strs("Hyperspace.Replica.Host").size();
137 
139 
140  int replication_port = props->get_i16("Hyperspace.Replica.Replication.Port");
141  // just look at hostname
142  size_t localhost_len = localhost.find('.');
143  if (localhost_len == std::string::npos)
144  localhost_len = localhost.length();
145  localhost = localhost.substr(0, localhost_len);
146 
147  // set master leases
148  m_env.rep_set_config(DB_REP_CONF_LEASE, 1);
149  // send writes that are part of one transaction in a single network xfer
150  m_env.rep_set_config(DB_REP_CONF_BULK, 1);
151  // in case there are only 2 replication sites, client can't become master in case
152  // master fails
153  m_env.rep_set_config(DB_REPMGR_CONF_2SITE_STRICT, 1);
154  // all replicas must ack all txns
155  m_env.repmgr_set_ack_policy(DB_REPMGR_ACKS_QUORUM);
156 
157 #if !(DB_VERSION_MAJOR > 5 || (DB_VERSION_MAJOR == 5 && DB_VERSION_MINOR >= 2))
158  m_env.rep_set_nsites(m_replication_info.num_replicas);
159 #endif
160 
161  // BDB times are in microseconds
162  m_env.rep_set_timeout(DB_REP_HEARTBEAT_SEND, 30000000); //30s
163  m_env.rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 60000000); //60s
164  m_env.rep_set_timeout(DB_REP_ACK_TIMEOUT,
165  props->get_i32("Hyperspace.Replica.Replication.Timeout")*1000);
166  m_env.rep_set_timeout(DB_REP_CONNECTION_RETRY, 5000000); //5s
167  m_env.rep_set_timeout(DB_REP_LEASE_TIMEOUT,
168  props->get_i32("Hyperspace.Replica.Replication.Timeout")*1000);
169 
170  int priority = m_replication_info.num_replicas;
171  for (auto replica : props->get_strs("Hyperspace.Replica.Host")) {
172  bool is_ipv4 = InetAddr::is_ipv4(replica.c_str());
173  bool is_localhost=false;
174  Endpoint e;
175 
176  if (is_ipv4) {
177  e = InetAddr::parse_endpoint(replica, replication_port);
178  if (replica == localip)
179  is_localhost = true;
180  }
181  else {
182  size_t replica_len = replica.find('.');
183  if (replica_len == std::string::npos)
184  replica_len = replica.length();
185  replica = replica.substr(0, replica_len);
186  e = InetAddr::parse_endpoint(replica, replication_port);
187  if (boost::iequals(localhost, e.host))
188  is_localhost = true;
189  }
190  if (is_localhost) {
191  // make sure all replicas are electable and have same priority
192  m_env.rep_set_priority(priority);
193 
194 #if DB_VERSION_MAJOR > 5 || (DB_VERSION_MAJOR == 5 && DB_VERSION_MINOR >= 2)
195  DbSite *dbsite;
196  m_env.repmgr_site(e.host.c_str(), e.port, &dbsite, 0);
197  dbsite->set_config(DB_LOCAL_SITE, 1);
198  dbsite->set_config(DB_LEGACY, 1);
199  dbsite->close();
200 #else
201  m_env.repmgr_set_local_site(e.host.c_str(), e.port, 0);
202 #endif
204  HT_INFOF("Added local replication site %s priority=%d", e.host.c_str(), priority);
205  }
206  else {
207  int eid;
208 
209 #if DB_VERSION_MAJOR > 5 || (DB_VERSION_MAJOR == 5 && DB_VERSION_MINOR >= 2)
210  DbSite *dbsite;
211  m_env.repmgr_site(e.host.c_str(), e.port, &dbsite, 0);
212  dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
213  dbsite->set_config(DB_LEGACY, 1);
214  dbsite->get_eid(&eid);
215  dbsite->close();
216 #else
217  m_env.repmgr_add_remote_site(e.host.c_str(), e.port, &eid, 0);
218 #endif
220  HT_INFOF("Added remote replication site %s priority=%d", e.host.c_str(), priority);
221  }
222  --priority;
223  }
224  m_env.repmgr_start(3, DB_REP_ELECTION);
227  }
228  else {
230  }
231 
232 
233  // only master can initiate writes
234  if (is_master()) {
235  // do checkpoint
236  m_env.txn_checkpoint(0, 0, 0);
237 
238  // open handles
239  Db *handle_namespace_db = new Db(&m_env, 0);
240  Db *handle_state_db = new Db(&m_env, 0);
241 
242  try {
243  handle_namespace_db->open(NULL, ms_name_namespace_db, NULL, DB_BTREE, m_db_flags, 0);
244  }
245  catch(DbException &e) {
246  // retry if locker killed to resolve a deadlock
247  if (e.get_errno() == DB_LOCK_DEADLOCK)
248  handle_namespace_db->open(NULL, ms_name_namespace_db, NULL, DB_BTREE, m_db_flags, 0);
249  else
250  throw;
251  }
252 
253  handle_state_db->set_flags(DB_DUP|DB_REVSPLITOFF);
254  try {
255  handle_state_db->open(NULL, ms_name_state_db, NULL, DB_BTREE, m_db_flags, 0);
256  }
257  catch(DbException &e) {
258  // retry if locker killed to resolve a deadlock
259  if (e.get_errno() == DB_LOCK_DEADLOCK)
260  handle_state_db->open(NULL, ms_name_state_db, NULL, DB_BTREE, m_db_flags, 0);
261  else
262  throw;
263  }
264 
265  key.set_data((void *)"/");
266  key.set_size(2);
267 
268  data.set_flags(DB_DBT_REALLOC);
269  if ((ret = handle_namespace_db->get(NULL, &key, &data, 0)) == DB_NOTFOUND) {
270  data.set_data(0);
271  data.set_size(0);
272  ret = handle_namespace_db->put(NULL, &key, &data, 0);
273  key.set_data((void *)"/hyperspace/");
274  key.set_size(strlen("/hyperspace/")+1);
275  ret = handle_namespace_db->put(NULL, &key, &data, 0);
276  key.set_data((void *)"/hyperspace/metadata");
277  key.set_size(strlen("/hyperspace/metadata")+1);
278  ret = handle_namespace_db->put(NULL, &key, &data, 0);
279  }
280 
281  if (data.get_data() != 0)
282  free(data.get_data());
283 
284  // initialize statedb if reqd
285  key.set_data((void *)"/");
286  key.set_size(2);
287 
288  data.set_flags(DB_DBT_REALLOC);
289  data.set_data(0);
290  data.set_size(0);
291 
292  if((ret = handle_state_db->get(NULL, &key, &data, 0)) == DB_NOTFOUND) {
293  data.set_data(0);
294  data.set_size(0);
295  ret = handle_state_db->put(NULL, &key, &data, 0);
296  HT_ASSERT(ret == 0);
297  }
298  // init next ids in statedb
299  keym.set_str(NEXT_SESSION_ID);
300  if ((ret = handle_state_db->get(NULL, &keym, &datam, 0)) == DB_NOTFOUND) {
301  sprintf(numbuf, "%llu", (Llu)1);
302  datam.set_str(numbuf);
303  ret = handle_state_db->put(NULL, &keym, &datam, 0);
304  HT_ASSERT(ret==0);
305  }
306  keym.set_str(NEXT_HANDLE_ID);
307  if ( (ret = handle_state_db->get(NULL, &keym, &datam, 0)) == DB_NOTFOUND) {
308  sprintf(numbuf, "%llu", (Llu)1);
309  datam.set_str(numbuf);
310  ret = handle_state_db->put(NULL, &keym, &datam, 0);
311  HT_ASSERT(ret==0);
312  }
313  keym.set_str(NEXT_EVENT_ID);
314  if ((ret = handle_state_db->get(NULL, &keym, &datam, 0)) == DB_NOTFOUND) {
315  sprintf(numbuf, "%llu", (Llu)1);
316  datam.set_str(numbuf);
317  ret = handle_state_db->put(NULL, &keym, &datam, 0);
318  HT_ASSERT(ret==0);
319  }
320 
321  if (data.get_data() != 0)
322  free(data.get_data());
323 
324  //close handles
325  handle_state_db->close(0);
326  delete handle_state_db;
327  handle_state_db = 0;
328  handle_namespace_db->close(0);
329  delete handle_namespace_db;
330  handle_namespace_db = 0;
331  HT_INFO("Replication master init done");
332  }
333 
334  }
335  catch (DbException &e) {
336  unsigned count = 0;
337  DB_REPMGR_SITE *list = 0;
338  if (m_env.repmgr_site_list(&count, &list) == 0) {
339  String msg;
340  if (count)
341  msg = String("The following replicas are (dis)connected: ");
342  for (unsigned i = 0; i < count; i++) {
343  const char *status = "unknown";
344  switch (list[i].status) {
345  case DB_REPMGR_CONNECTED:
346  status = "connected";
347  break;
348  case DB_REPMGR_DISCONNECTED:
349  status = "disconnected";
350  break;
351  }
352  msg += String("(") + list[i].host + " status: " + status;
353  }
354  if (count)
355  HT_INFOF("%s", msg.c_str());
356  }
357  HT_FATALF("Error initializing Berkeley DB (dir=%s) - %s; please make sure "
358  "that the hyperspace replicas are correctly configured.",
359  m_base_dir.c_str(), e.what());
360  }
361 
362  // initialize per thread DB handles
363  init_db_handles(thread_ids);
364  HT_DEBUG_OUT <<"namespace initialized"<< HT_END;
365 }
366 
367 /*
368  */
370  /*
371  * Close Berkeley DB "namespace" database and environment
372  */
373  try {
374  HT_INFO("Closed DB handles for all threads ");
375  for (auto &val : m_thread_handle_map) {
376  (val.second)->close();
377  }
378  m_env.close(0);
379  }
380  catch(DbException &e) {
381  HT_ERRORF("Error closing Berkeley DB (dir=%s) - %s",
382  m_base_dir.c_str(), e.what());
383  }
384  HT_INFO("namespace closed");
385 }
386 
387 void BerkeleyDbFilesystem::db_msg_callback(const DbEnv *dbenv, const char *msg)
388 {
389  HT_DEBUG_OUT << "BDB MESSAGE:" << msg << HT_END;
390 }
391 
392 void BerkeleyDbFilesystem::db_err_callback(const DbEnv *dbenv, const char *errpfx,
393  const char *msg)
394 {
395  HT_INFOF("BDB ERROR: %s", msg);
396 }
397 
398 
399 void BerkeleyDbFilesystem::db_event_callback(DbEnv *dbenv, uint32_t which, void *info)
400 {
401  ReplicationInfo *replication_info = (ReplicationInfo*)dbenv->get_app_private();
402 
403  switch (which) {
404  case DB_EVENT_REP_CLIENT:
405  HT_INFO("Received DB_EVENT_REP_CLIENT event");
406  break;
407  case DB_EVENT_REP_MASTER:
408  HT_INFO("Received DB_EVENT_REP_MASTER event");
409  HT_INFOF("Local site elected master: %s", replication_info->localhost.c_str());
410  replication_info->is_master = true;
411  replication_info->finish_election();
412  break;
413  case DB_EVENT_REP_ELECTED:
414  HT_INFO("Received DB_EVENT_REP_ELECTED event ignore and wait for DB_EVENT_REP_MASTER");
415  break;
416  case DB_EVENT_REP_NEWMASTER:
417  HT_INFO("Received DB_EVENT_REP_NEWMASTER event");
418  // exit if we lost mastership
419  if (replication_info->is_master)
420  HT_FATAL("Local site lost mastership");
421  replication_info->master_eid = *((int*)info);
422  {
423  auto it = replication_info->replica_map.find(replication_info->master_eid);
424  HT_ASSERT (it != replication_info->replica_map.end());
425  HT_INFOF("New master elected: %s", it->second.c_str());
426 
427  if (replication_info->election_finished())
428  HT_FATAL("New master elected after initial master election.");
429 
430  replication_info->finish_election();
431  }
432  break;
433  case DB_EVENT_REP_PERM_FAILED:
434  if (replication_info->is_master)
435  HT_FATAL("Replication failed. Master did not receive enough acks.");
436  break;
437  case DB_EVENT_PANIC:
438  HT_FATAL("Received DB_EVENT_PANIC event");
439  break;
440  case DB_EVENT_REP_STARTUPDONE:
441  HT_INFO("Received DB_EVENT_REP_STARTUPDONE event");
442  break;
443  case DB_EVENT_WRITE_FAILED:
444  HT_INFO("Received DB_EVENT_WROTE_FAILED event");
445  break;
446 
447 #if DB_VERSION_MAJOR > 5 || (DB_VERSION_MAJOR == 5 && DB_VERSION_MINOR >= 2)
448  case DB_EVENT_REP_CONNECT_BROKEN:
449  HT_INFO("Received DB_EVENT_REP_CONNECT_BROKEN event");
450  break;
451  case DB_EVENT_REP_CONNECT_ESTD:
452  HT_INFO("Received DB_EVENT_REP_CONNECT_ESTD event");
453  break;
454  case DB_EVENT_REP_CONNECT_TRY_FAILED:
455  HT_INFO("Received DB_EVENT_REP_CONNECT_TRY_FAILED event");
456  break;
457  case DB_EVENT_REP_DUPMASTER:
458  HT_INFO("Received DB_EVENT_REP_DUPMASTER event");
459  break;
460  case DB_EVENT_REP_ELECTION_FAILED:
461  HT_INFO("Received DB_EVENT_REP_ELECTION_FAILED event");
462  break;
463  case DB_EVENT_REP_INIT_DONE:
464  HT_INFO("Received DB_EVENT_REP_INIT_DONE event");
465  break;
466  case DB_EVENT_REP_JOIN_FAILURE:
467  HT_INFO("Received DB_EVENT_REP_JOIN_FAILURE event");
468  break;
469  case DB_EVENT_REP_LOCAL_SITE_REMOVED:
470  HT_INFO("Received DB_EVENT_REP_LOCAL_SITE_REMOVED event");
471  break;
472  case DB_EVENT_REP_MASTER_FAILURE:
473  HT_INFO("Received DB_EVENT_REP_MASTER_FAILURE event");
474  break;
475  case DB_EVENT_REP_SITE_ADDED:
476  HT_INFO("Received DB_EVENT_REP_SITE_ADDED event");
477  break;
478  case DB_EVENT_REP_SITE_REMOVED:
479  HT_INFO("Received DB_EVENT_REP_SITE_REMOVED event");
480  break;
481 #endif
482 
483  default:
484  HT_INFO("Received BerkeleyDB event ");
485  }
486 }
487 
488 void BerkeleyDbFilesystem::init_db_handles(const std::vector<Thread::id> &thread_ids) {
489 
490  BDbHandlesPtr db_handles;
491 
492  // Assign per thread handles but don't open them yet
493  std::stringstream tid_str;
494  for (auto thread_id : thread_ids) {
495  db_handles = std::make_shared<BDbHandles>();
496  m_thread_handle_map[thread_id] = db_handles;
497  tid_str << thread_id;
498  HT_INFOF("Created DB handles for thread: %s", tid_str.str().c_str());
499  tid_str.str(std::string());
500  }
501 
502 }
503 
504 
506  ThreadHandleMap::iterator it = m_thread_handle_map.find(ThisThread::get_id());
507  if (it == m_thread_handle_map.end())
508  HT_FATAL_OUT << "No thread handle found for thread " << ThisThread::get_id() << HT_END;
509 
510  // Open per thread handles if not already open
511  if (!it->second->open) {
512  it->second->handle_namespace_db = new Db(&m_env, 0);
513  it->second->handle_state_db = new Db(&m_env, 0);
514  it->second->handle_namespace_db->open(NULL, ms_name_namespace_db,
515  NULL, DB_BTREE, m_db_flags, 0);
516  it->second->handle_state_db->set_flags(DB_DUP|DB_REVSPLITOFF);
517  it->second->handle_state_db->open(NULL, ms_name_state_db, NULL,
518  DB_BTREE, m_db_flags, 0);
519  it->second->open=true;
520  }
521  return it->second;
522 
523 }
524 
526 
527  // do checkpoint, don't bother to check if this is the master
528  // since its just ignored be slaves
529  HT_DEBUG_OUT << "Do checkpoint if log > " << m_checkpoint_size_kb << "KB" << HT_END;
530  int ret;
531  try {
532  ret = m_env.txn_checkpoint(m_checkpoint_size_kb, 0, 0);
533  if (ret != 0) {
534  HT_FATAL_OUT << "Unable to do checkpoint got ret=" << ret << HT_END;
535  }
536  }
537  catch (DbException &e) {
538  HT_FATAL_OUT << "Error checkpointing BerkeleyDb: " << e.what() << HT_END;
539  }
540 
541  auto now = std::chrono::steady_clock::now();
542  auto time_elapsed = now - m_last_log_gc_time;
543 
544  if (time_elapsed > m_log_gc_interval) {
545  m_last_log_gc_time = now;
546 
547  // delete all but the last max_unused_logs files
548  char **unused_logs, **log;
549  uint32_t unused_logs_count=0;
550  unused_logs = log = NULL;
551 
552  try {
553  ret = m_env.log_archive(&unused_logs, DB_ARCH_ABS);
554  if (ret != 0) {
555  HT_FATAL_OUT << "Unable to get list of unused BerkeleyDB log files, got ret=" << ret
556  << HT_END;
557  }
558  }
559  catch (DbException &e) {
560  HT_FATAL_OUT<< "Error getting list of unused BerkeleyDb log files: "
561  << e.what() << HT_END;
562  }
563 
564 
565  if (unused_logs != NULL) {
566  for (log = unused_logs; *log != NULL; ++log)
567  unused_logs_count++;
568 
569  for (log = unused_logs; *log != NULL && unused_logs_count > m_max_unused_logs;
570  ++log, --unused_logs_count) {
571  // delete log file
572  Path file(*log);
573  boost::filesystem::remove(file);
574  HT_INFOF("Deleted unused BerkeleyDb log %s", *log);
575  }
576  free(unused_logs);
577  }
578  }
579 
580 }
581 
583 
584  // begin transaction
585  try {
586  HT_ASSERT(txn.handle_namespace_db == 0 && txn.handle_state_db == 0);
587 
588  // open db handles
589  BDbHandlesPtr db_handles = get_db_handles();
590 
591  // Use handles for this thread
592  txn.handle_namespace_db = db_handles->handle_namespace_db;
593  txn.handle_state_db = db_handles->handle_state_db;
594 
595  // open txn
596  m_env.txn_begin(NULL, &txn.db_txn, 0);
597  }
598  catch (DbException &e) {
599  // issue 915: a failure of txn_begin is possible if BDB ran out of
600  // locker entries
602  }
603  HT_DEBUG_OUT <<"txn="<< txn << HT_END;
604 
605  return;
606 }
607 
608 
609 /*
610  */
611 bool
613  const String &aname, uint32_t *valuep) {
614  int ret;
615  Dbt key;
616  DbtManaged data;
617  String keystr = fname;
618 
619  build_attr_key(txn, keystr, aname, key);
620 
621  try {
622  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == 0) {
623  *valuep = strtoll((const char *)data.get_data(), 0, 0);
624  HT_DEBUG_ATTR(txn, fname, aname, key, *valuep);
625  return true;
626  }
627  }
628  catch (DbException &e) {
629  if (e.get_errno() == DB_LOCK_DEADLOCK)
631  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
633  HT_ERRORF("Berkeley DB error: %s", e.what());
635  }
636 
637  return false;
638 }
639 
640 
641 
642 /*
643  */
644 void
646  const String &aname, uint32_t value) {
647  int ret;
648  Dbt key, data;
649  char numbuf[16];
650  String keystr = fname;
651 
652  build_attr_key(txn, keystr, aname, key);
653 
654  sprintf(numbuf, "%u", value);
655 
656  data.set_data(numbuf);
657  data.set_size(strlen(numbuf)+1);
658 
659  try {
660  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
661  HT_DEBUG_ATTR(txn, fname, aname, key, value);
662  }
663  catch (DbException &e) {
664  if (e.get_errno() == DB_LOCK_DEADLOCK)
666  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
668  HT_ERRORF("Berkeley DB error: %s", e.what());
670  }
671 
672  HT_ASSERT(ret == 0);
673 }
674 
675 
676 /*
677  */
678 bool
680  const String &aname, uint64_t *valuep) {
681  int ret;
682  Dbt key;
683  DbtManaged data;
684  String keystr = fname;
685 
686  build_attr_key(txn, keystr, aname, key);
687 
688  try {
689  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == 0) {
690  *valuep = strtoll((const char *)data.get_data(), 0, 0);
691  HT_DEBUG_ATTR(txn, fname, aname, key, *valuep);
692  return true;
693  }
694  }
695  catch (DbException &e) {
696  if (e.get_errno() == DB_LOCK_DEADLOCK)
698  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
700  HT_ERRORF("Berkeley DB error: %s", e.what());
702  }
703 
704  return false;
705 }
706 
707 
708 
709 /*
710  */
711 void
713  const String &aname, uint64_t value) {
714  int ret;
715  Dbt key, data;
716  char numbuf[24];
717  String keystr = fname;
718 
719  build_attr_key(txn, keystr, aname, key);
720 
721  sprintf(numbuf, "%llu", (Llu)value);
722 
723  data.set_data(numbuf);
724  data.set_size(strlen(numbuf)+1);
725 
726  try {
727  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
728  HT_DEBUG_ATTR(txn, fname, aname, key, value);
729  }
730  catch (DbException &e) {
731  if (e.get_errno() == DB_LOCK_DEADLOCK)
733  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
735  HT_ERRORF("Berkeley DB error: %s", e.what());
737  }
738 
739  HT_ASSERT(ret == 0);
740 }
741 
742 /*
743  */
744 bool
745 BerkeleyDbFilesystem::incr_attr(BDbTxn &txn, const String &fname, const String &aname,
746  uint64_t *valuep) {
747  int ret;
748  Dbt key, data;
749  String keystr = fname;
750  char numbuf[24];
751  uint64_t new_value;
752 
753  build_attr_key(txn, keystr, aname, key);
754 
755  try {
756  data.set_flags(DB_DBT_REALLOC);
757  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == 0) {
758  if (data.get_size() >= 24)
760  "incr attribute '%s' exceeds 24 characters", aname.c_str());
761 
762  memcpy(numbuf, (const char *)data.get_data(), data.get_size());
763  numbuf[data.get_size()] = 0; // NUL-terminate
764 
765  // Sanity check value
766  for (const char *ptr=numbuf; *ptr; ptr++) {
767  if (!::isdigit(*ptr))
769  "incr attribute '%s' contains invalid characters: %s",
770  aname.c_str(), numbuf);
771  }
772 
773  *valuep = strtoull(numbuf, 0, 0);
774 
775  // Sanity check value
776  if (*valuep == 0 && errno == EINVAL)
778  "incr attr '%s' invalid: '%s', cannot convert to integer",
779  aname.c_str(), numbuf);
780 
781  HT_DEBUG_ATTR(txn, fname, aname, key, *valuep);
782  new_value = *valuep + 1;
783  sprintf(numbuf, "%llu", (Llu)new_value);
784  data.set_data(numbuf);
785  data.set_size(strlen(numbuf));
786 
787  if ((ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0)) == 0) {
788  HT_DEBUG_ATTR(txn, fname, aname, key, new_value);
789  return true;
790  }
791  }
792  }
793  catch (DbException &e) {
794  if (e.get_errno() == DB_LOCK_DEADLOCK)
796  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
798  HT_ERRORF("Berkeley DB error: %s", e.what());
800  }
801 
802  return false;
803 }
804 /*
805  */
806 void
808  const String &aname, const void *value, size_t value_len) {
809  int ret;
810  Dbt key, data;
811  String keystr = fname;
812 
813  build_attr_key(txn, keystr, aname, key);
814  data.set_data((void *)value);
815  data.set_size(value_len);
816 
817  try {
818  HT_DEBUG_ATTR_(txn, fname, aname, key, value, value_len);
819  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
820  }
821  catch (DbException &e) {
822  if (e.get_errno() == DB_LOCK_DEADLOCK)
824  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
826  HT_ERRORF("Berkeley DB error: %s", e.what());
828  }
829 
830  HT_ASSERT(ret == 0);
831 }
832 
833 
834 /*
835  */
836 bool
838  const String &aname, DynamicBuffer &vbuf) {
839  int ret;
840  Dbt key;
841  DbtManaged data;
842  String keystr = fname;
843 
844  build_attr_key(txn, keystr, aname, key);
845 
846  HT_DEBUG_OUT << "get_xattr txn="<< txn <<", fname=" << fname << ", attr='" << aname << HT_END;
847 
848  try {
849  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == 0) {
850  vbuf.reserve(data.get_size());
851  memcpy(vbuf.base, (uint8_t *)data.get_data(), data.get_size());
852  vbuf.ptr += data.get_size();
853  HT_DEBUG_ATTR_(txn, fname, aname, key, vbuf.base, data.get_size());
854  return true;
855  }
856  }
857  catch (DbException &e) {
858  if (e.get_errno() == DB_LOCK_DEADLOCK)
860  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
862  HT_ERRORF("Berkeley DB error: %s", e.what());
864  }
865 
866  return false;
867 }
868 
869 /*
870  */
871 bool
872 BerkeleyDbFilesystem::exists_xattr(BDbTxn &txn, const String &fname, const String &aname)
873 {
874  int ret;
875  Dbt key;
876  String keystr = fname;
877  bool exists = false;
878 
879  build_attr_key(txn, keystr, aname, key);
880 
881  try {
882  ret = txn.handle_namespace_db->exists(txn.db_txn, &key, 0);
883  HT_EXPECT(ret == 0 || ret == DB_NOTFOUND, HYPERSPACE_BERKELEYDB_ERROR);
884 
885  if (ret == 0) {
886  exists = true;
887  }
888  }
889  catch (DbException &e) {
890  if (e.get_errno() == DB_LOCK_DEADLOCK)
892  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
894  HT_ERRORF("Berkeley DB error: %s", e.what());
896  }
897 
898  return exists;
899 }
900 
901 
902 void
904  const String &aname) {
905  int ret;
906  Dbt key;
907  String keystr = fname;
908 
909  build_attr_key(txn, keystr, aname, key);
910 
911  try {
912  if ((ret = txn.handle_namespace_db->del(txn.db_txn, &key, 0)) == DB_NOTFOUND)
914  HT_DEBUG_ATTR_(txn, fname, aname, key, "", 0);
915  }
916  catch (DbException &e) {
917  if (e.get_errno() == DB_LOCK_DEADLOCK)
919  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
921  HT_ERRORF("Berkeley DB error: %s", e.what());
923  }
924 }
925 
926 
927 /*
928  */
929 void BerkeleyDbFilesystem::mkdir(BDbTxn &txn, const String &name) {
930  int ret;
931  Dbt key;
932  DbtManaged data;
933  size_t lastslash = name.rfind('/', name.length()-1);
934  String dirname = name.substr(0, lastslash+1);
935 
936  try {
937  /*
938  * Make sure parent directory exists
939  */
940  key.set_data((void *)dirname.c_str());
941  key.set_size(dirname.length()+1);
942 
943  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == DB_NOTFOUND)
945 
946  // formulate directory name
947  dirname = name;
948  if (dirname[dirname.length()-1] != '/')
949  dirname += "/";
950 
951  HT_DEBUG_OUT <<"dirname='"<< dirname <<"'"<< HT_END;
952 
953  /*
954  * Make sure directory does not already exists
955  */
956  key.set_data((void *)dirname.c_str());
957  key.set_size(dirname.length()+1);
958 
959  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) != DB_NOTFOUND)
961 
962  /*
963  * Create directory entry
964  */
965  key.set_data((void *)dirname.c_str());
966  key.set_size(dirname.length()+1);
967 
968  data.clear();
969 
970  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
971 
972  }
973  catch (DbException &e) {
974  if (e.get_errno() == DB_LOCK_DEADLOCK)
976  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
978  HT_ERRORF("Berkeley DB error: %s", e.what());
980  }
981 }
982 
983 
984 void BerkeleyDbFilesystem::unlink(BDbTxn &txn, const String &name) {
985  std::vector<String> delkeys;
986  DbtManaged keym, datam;
987  Dbt key;
988  Dbc *cursorp = 0;
989  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
990  bool looks_like_dir = false;
991  bool looks_like_file = false;
992  String str;
993 
994  try {
995  txn.handle_namespace_db->cursor(txn.db_txn, &cursorp, 0);
996 
997  keym.set_str(name);
998 
999  if (cursorp->get(&keym, &datam, DB_SET_RANGE) != DB_NOTFOUND) {
1000  do {
1001  str = keym.get_str();
1002 
1003  if (str.length() > name.length()) {
1004  if (str[name.length()] == '/') {
1005  if (str.length() > name.length() + 1
1006  && str[name.length() + 1] != NODE_ATTR_DELIM) {
1008  }
1009  looks_like_dir = true;
1010  delkeys.push_back(keym.get_str());
1011  }
1012  else if (str[name.length()] == NODE_ATTR_DELIM) {
1013  looks_like_file = true;
1014  delkeys.push_back(keym.get_str());
1015  }
1016  }
1017  else {
1018  delkeys.push_back(keym.get_str());
1019  looks_like_file = true;
1020  }
1021 
1022  } while (cursorp->get(&keym, &datam, DB_NEXT) != DB_NOTFOUND &&
1023  starts_with(keym.get_str(), name.c_str()));
1024  }
1025 
1026  HT_ASSERT(!(looks_like_dir && looks_like_file));
1027 
1028  if (delkeys.empty())
1030 
1031  for (size_t i=0; i<delkeys.size(); i++) {
1032  key.set_data((void *)delkeys[i].c_str());
1033  key.set_size(delkeys[i].length()+1);
1034  HT_ASSERT(txn.handle_namespace_db->del(txn.db_txn, &key, 0) != DB_NOTFOUND);
1035  HT_DEBUG_ATTR_(txn, name, "", key, "", 0);
1036  }
1037  }
1038  catch (DbException &e) {
1039  HT_ERRORF("Berkeley DB error: %s", e.what());
1040  if (e.get_errno() == DB_LOCK_DEADLOCK)
1042  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1044  else
1046  }
1047 }
1048 
1049 
1050 bool
1051 BerkeleyDbFilesystem::exists(BDbTxn &txn, String fname, bool *is_dir_p) {
1052  int ret;
1053  Dbt key;
1054 
1055  if (is_dir_p)
1056  *is_dir_p = false;
1057 
1058  key.set_data((void *)fname.c_str());
1059  key.set_size(fname.length()+1);
1060 
1061  try {
1062  if ((ret = txn.handle_namespace_db->exists(txn.db_txn, &key, 0)) == DB_NOTFOUND) {
1063  fname += "/";
1064  key.set_data((void *)fname.c_str());
1065  key.set_size(fname.length()+1);
1066 
1067  if ((ret = txn.handle_namespace_db->exists(txn.db_txn, &key, 0)) == DB_NOTFOUND) {
1068  HT_DEBUG_OUT <<"'"<< fname <<"' does NOT exist."<< HT_END;
1069  return false;
1070  }
1071  if (is_dir_p)
1072  *is_dir_p = true;
1073  }
1074  }
1075  catch (DbException &e) {
1076  HT_ERRORF("Berkeley DB error: %s", e.what());
1077  if (e.get_errno() == DB_LOCK_DEADLOCK)
1079  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1081  else
1083  }
1084 
1085  HT_DEBUG_OUT <<"'"<< fname <<"' exists."<< HT_END;
1086  return true;
1087 }
1088 
1089 
1090 /*
1091  *
1092  */
1093 void
1094 BerkeleyDbFilesystem::create(BDbTxn &txn, const String &fname, bool temp) {
1095  int ret;
1096  Dbt key;
1097  DbtManaged data;
1098 
1099  HT_DEBUG_OUT <<"txn="<< txn <<" fname='"<< fname <<"' temp="<< temp << HT_END;
1100 
1101  try {
1102  if (exists(txn, fname, 0))
1104 
1105  if (ends_with(fname, "/"))
1107 
1108  size_t lastslash = fname.rfind('/', fname.length() - 1);
1109  String parent_dir = fname.substr(0, lastslash + 1);
1110 
1111  key.set_data((void *)parent_dir.c_str());
1112  key.set_size(parent_dir.length()+1);
1113 
1114  if ((ret = txn.handle_namespace_db->get(txn.db_txn, &key, &data, 0)) == DB_NOTFOUND)
1116 
1117  key.set_data((void *)fname.c_str());
1118  key.set_size(fname.length()+1);
1119 
1120  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
1121 
1122  if (temp) {
1123  String temp_key = fname + NODE_ATTR_DELIM +"temp";
1124  key.set_data((void *)temp_key.c_str());
1125  key.set_size(temp_key.length()+1);
1126  ret = txn.handle_namespace_db->put(txn.db_txn, &key, &data, 0);
1127  }
1128  }
1129  catch (DbException &e) {
1130  HT_ERRORF("Berkeley DB error: %s", e.what());
1131  if (e.get_errno() == DB_LOCK_DEADLOCK)
1133  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1135  else
1137  }
1138 }
1139 
1140 
1141 void
1143  std::vector<DirEntry> &listing) {
1144  DbtManaged keym, datam;
1145  Dbt key;
1146  Dbc *cursorp = 0;
1147  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1148  String str, last_str;
1149  DirEntry entry;
1150  size_t offset;
1151 
1152  try {
1153  txn.handle_namespace_db->cursor(txn.db_txn, &cursorp, 0);
1154 
1155  if (!ends_with(fname, "/"))
1156  fname += "/";
1157 
1158  HT_DEBUG_OUT <<"txn="<< txn <<" dir='"<< fname <<"'"<< HT_END;
1159  keym.set_str(fname);
1160 
1161  if (cursorp->get(&keym, &datam, DB_SET_RANGE) != DB_NOTFOUND) {
1162 
1163  if (!starts_with(keym.get_str(), fname.c_str())) {
1165  }
1166 
1167  do {
1168  str = keym.get_str();
1169 
1170  if (str.length() > fname.length()) {
1171  if (str[fname.length()] != NODE_ATTR_DELIM ) {
1172  str = str.substr(fname.length());
1173 
1174  if ((offset = str.find('/')) != String::npos) {
1175  entry.name = str.substr(0, offset);
1176 
1177  if (entry.name != last_str) {
1178  entry.is_dir = true;
1179  listing.push_back(entry);
1180  last_str = entry.name;
1181  }
1182  }
1183  else {
1184  if ((offset = str.find(NODE_ATTR_DELIM)) != String::npos) {
1185  entry.name = str.substr(0, offset);
1186 
1187  if (entry.name != last_str) {
1188  entry.is_dir = false;
1189  listing.push_back(entry);
1190  last_str = entry.name;
1191  }
1192  }
1193  else {
1194  entry.name = str;
1195 
1196  if (entry.name != last_str) {
1197  entry.is_dir = false;
1198  listing.push_back(entry);
1199  last_str = entry.name;
1200  }
1201  }
1202  }
1203  }
1204  }
1205  } while (cursorp->get(&keym, &datam, DB_NEXT) != DB_NOTFOUND &&
1206  starts_with(keym.get_str(), fname.c_str()));
1207 
1208  }
1209  }
1210  catch (DbException &e) {
1211  if (e.get_errno() == DB_LOCK_DEADLOCK)
1213  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1215  HT_ERRORF("Berkeley DB error: %s", e.what());
1217  }
1218 }
1219 
1220 namespace {
1221  const char stop_chars[3] = { (char)BerkeleyDbFilesystem::NODE_ATTR_DELIM, '/', 0 };
1222 }
1223 
1224 void
1226  const String &aname,
1227  bool include_sub_entries,
1228  std::vector<DirEntryAttr> &listing) {
1229  get_directory_attr_listing(txn, fname, aname, listing);
1230  if (include_sub_entries) {
1231  if (!ends_with(fname, "/"))
1232  fname += "/";
1233  for (auto &entry : listing)
1234  if (entry.is_dir)
1235  get_directory_attr_listing(txn, fname + entry.name, aname, true, entry.sub_entries);
1236  }
1237 }
1238 
1239 void
1241  const String &aname,
1242  std::vector<DirEntryAttr> &listing) {
1243  DbtManaged keym, datam;
1244  Dbt key;
1245  Dbc *cursorp = 0;
1246  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1247  String entryname, last_entryname, str, attr;
1248  DirEntryAttr entry;
1249  size_t offset;
1250 
1251  try {
1252  txn.handle_namespace_db->cursor(txn.db_txn, &cursorp, 0);
1253 
1254  if (!ends_with(fname, "/"))
1255  fname += "/";
1256 
1257  HT_DEBUG_OUT <<"txn="<< txn <<" dir='"<< fname <<"'"<< HT_END;
1258  keym.set_str(fname);
1259 
1260  if (cursorp->get(&keym, &datam, DB_SET_RANGE) != DB_NOTFOUND) {
1261 
1262  if (!starts_with(keym.get_str(), fname.c_str())) {
1264  }
1265 
1266  do {
1267 
1268  str = keym.get_str();
1269  if (str.length() > fname.length() && str[fname.length()] != NODE_ATTR_DELIM) {
1270  str = str.substr(fname.length());
1271  offset = str.find_first_of(stop_chars);
1272  entryname = (offset == String::npos) ? str : str.substr(0, offset);
1273  if (entryname != last_entryname) {
1274  if (last_entryname != "")
1275  listing.push_back(entry);
1276  last_entryname = entryname;
1277  // clear entry
1278  entry.name = entryname;
1279  entry.has_attr = false;
1280  entry.is_dir = false;
1281  entry.attr.free();
1282  }
1283  if (offset != String::npos) {
1284  if (str[offset] == '/') {
1285  entry.is_dir = true;
1286  if (str.length() > offset+2 && str[offset+1] == NODE_ATTR_DELIM) {
1287  attr = str.substr(offset+2);
1288  // attribute matches the one we're looking for
1289  if (attr == aname) {
1290  DynamicBuffer buffer(datam.get_size());
1291  buffer.add_unchecked(datam.get_data(), datam.get_size());
1292  entry.attr = buffer;
1293  entry.has_attr = true;
1294  }
1295  }
1296  }
1297  else {
1298  attr = str.substr(offset+1);
1299  if (attr == aname) {
1300  DynamicBuffer buffer(datam.get_size());
1301  buffer.add_unchecked(datam.get_data(), datam.get_size());
1302  entry.attr = buffer;
1303  entry.has_attr = true;
1304  }
1305  }
1306  }
1307  }
1308 
1309  } while (cursorp->get(&keym, &datam, DB_NEXT) != DB_NOTFOUND &&
1310  starts_with(keym.get_str(), fname.c_str()));
1311 
1312  if (last_entryname != "")
1313  listing.push_back(entry);
1314 
1315  }
1316  }
1317  catch (DbException &e) {
1318  if (e.get_errno() == DB_LOCK_DEADLOCK)
1320  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1322  HT_ERRORF("Berkeley DB error: %s", e.what());
1324  }
1325 }
1326 
1327 void
1329  std::vector<String> &names) {
1330  DbtManaged keym, datam;
1331  Dbc *cursorp = 0;
1332  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1333  int ret;
1334 
1335  HT_DEBUG_OUT <<"txn="<< txn << HT_END;
1336 
1337  try {
1338  txn.handle_namespace_db->cursor(txn.db_txn, &cursorp, 0);
1339 
1340  while ((ret = cursorp->get(&keym, &datam, DB_NEXT)) == 0) {
1341  names.push_back(keym.get_str());
1342  }
1343  }
1344  catch(DbException &e) {
1345  if (e.get_errno() == DB_REP_HANDLE_DEAD)
1347 
1348  HT_FATALF("Berkeley DB error: %s", e.what());
1349  }
1350  catch(std::exception &e) {
1351  HT_FATALF("Berkeley DB error: %s", e.what());
1352  }
1353 
1354 }
1355 
1356 void
1358  const String &aname, Dbt &key) {
1359  bool isdir;
1360 
1361  if (!exists(txn, keystr, &isdir))
1363 
1364  if (isdir)
1365  keystr += "/";
1366 
1367  keystr += NODE_ATTR_DELIM;
1368  keystr += aname;
1369 
1370  key.set_data((void *)keystr.c_str());
1371  key.set_size(keystr.length() + 1);
1372 }
1373 
1374 bool
1376  std::vector<String> &anames)
1377 {
1378  DbtManaged keym, datam;
1379  Dbc *cursorp = 0;
1380  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1381  int ret;
1382  bool isdir;
1383 
1384  if (!exists(txn, fname, &isdir))
1386 
1387  const String targetkey(fname + (isdir ? "/" : "") + NODE_ATTR_DELIM);
1388 
1389  HT_DEBUG_OUT << "txn=" << txn << HT_END;
1390  try {
1391  txn.handle_namespace_db->cursor(txn.db_txn, &cursorp, 0);
1392  while ((ret = cursorp->get(&keym, &datam, DB_NEXT)) == 0) {
1393  /* build a prefix of a key to match against all keys later */
1394  String currkey = keym.get_str();
1395 
1396  /* this whole key is shorter than our prefix - skip it */
1397  if (currkey.size() < targetkey.size())
1398  continue;
1399 
1400  size_t keysize = targetkey.size();
1401  if (!targetkey.compare(0, keysize, currkey, 0, keysize)) {
1402  /* strip everything before and including attribute
1403  delimiter to get the attribute name */
1404  String attribute(currkey.substr(keysize, currkey.size()));
1405  anames.push_back(attribute);
1406  }
1407  }
1408  } catch(DbException &e) {
1409  HT_FATALF("Berkeley DB error: %s", e.what());
1410  return false;
1411  } catch(std::exception &e) {
1412  HT_FATALF("Berkeley DB error: %s", e.what());
1413  return false;
1414  }
1415 
1416  return true;
1417 }
1418 
1419 /*
1420  *
1421  */
1422 void
1423 BerkeleyDbFilesystem::create_event(BDbTxn &txn, uint32_t type, uint64_t id,
1424  uint32_t mask)
1425 {
1426  int ret;
1427  DbtManaged keym, datam;
1428  String key_str;
1429  char numbuf[16];
1430  Dbc *cursorp = 0;
1431  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1432 
1433  HT_DEBUG_OUT <<"create_event txn="<< txn <<" event type='"<< type <<"' id="
1434  << id << " mask=" << mask << HT_END;
1435  try {
1436 
1437  HT_ASSERT(!(event_exists(txn, id)));
1438 
1439  // Store id under "/EVENTS/"
1440  String events_dir = EVENTS_STR;
1441  keym.set_str(events_dir);
1442  sprintf(numbuf, "%llu", (Llu)id);
1443  datam.set_str(numbuf);
1444 
1445  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1446  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
1447  HT_ASSERT(ret == 0);
1448 
1449  // Store event type
1450  key_str = get_event_key(id, EVENT_TYPE);
1451  keym.set_str(key_str);
1452  sprintf(numbuf, "%lu", (Lu)type);
1453  datam.set_str(numbuf);
1454 
1455  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1456  HT_ASSERT(ret == 0);
1457  // Store event mask
1458  key_str = get_event_key(id, EVENT_MASK);
1459  keym.set_str(key_str);
1460  sprintf(numbuf, "%lu", (Lu)mask);
1461  datam.set_str(numbuf);
1462 
1463  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1464  HT_ASSERT(ret == 0);
1465  }
1466  catch (DbException &e) {
1467  if (e.get_errno() == DB_LOCK_DEADLOCK)
1469  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1471  HT_ERRORF("Berkeley DB error: %s", e.what());
1473  }
1474 }
1475 
1476 /*
1477  *
1478  */
1479 void
1480 BerkeleyDbFilesystem::create_event(BDbTxn &txn, uint32_t type, uint64_t id,
1481  uint32_t mask, const String &name)
1482 {
1483  int ret;
1484  String key_str;
1485  DbtManaged keym, datam;
1486  Dbc *cursorp = 0;
1487  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1488 
1489  try {
1490  create_event(txn, type, id, mask);
1491  // Store event name
1492  key_str = get_event_key(id, EVENT_NAME);
1493  keym.set_str(key_str);
1494  datam.set_str(name);
1495  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1496  HT_ASSERT(ret == 0);
1497  }
1498  catch (DbException &e) {
1499  if (e.get_errno() == DB_LOCK_DEADLOCK)
1501  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1503  HT_ERRORF("Berkeley DB error: %s", e.what());
1505  }
1506 }
1507 /*
1508  *
1509  */
1510 void
1511 BerkeleyDbFilesystem::create_event(BDbTxn &txn, uint32_t type, uint64_t id,
1512  uint32_t mask, uint32_t mode)
1513 {
1514  int ret;
1515  DbtManaged keym, datam;
1516  String key_str;
1517  char numbuf[16];
1518 
1519  try {
1520  create_event(txn, type, id, mask);
1521  // Store mode
1522  key_str = get_event_key(id, EVENT_MODE);
1523  keym.set_str(key_str);
1524  sprintf(numbuf, "%lu", (Lu)mode);
1525  datam.set_str(numbuf);
1526 
1527  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1528  HT_ASSERT(ret == 0);
1529  }
1530  catch (DbException &e) {
1531  if (e.get_errno() == DB_LOCK_DEADLOCK)
1533  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1535  HT_ERRORF("Berkeley DB error: %s", e.what());
1537  }
1538 }
1539 /*
1540  *
1541  */
1542 void
1543 BerkeleyDbFilesystem::create_event(BDbTxn &txn, uint32_t type, uint64_t id,
1544  uint32_t mask, uint32_t mode,
1545  uint64_t generation)
1546 {
1547  int ret;
1548  DbtManaged keym, datam;
1549  String key_str;
1550  char numbuf[16];
1551 
1552  try {
1553  create_event(txn, type, id, mask, mode);
1554  // Store generation
1555  key_str = get_event_key(id, EVENT_GENERATION);
1556  keym.set_str(key_str);
1557  sprintf(numbuf, "%llu", (Llu)generation);
1558  datam.set_str(numbuf);
1559 
1560  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1561  HT_ASSERT(ret == 0);
1562  }
1563  catch (DbException &e) {
1564  if (e.get_errno() == DB_LOCK_DEADLOCK)
1566  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1568  HT_ERRORF("Berkeley DB error: %s", e.what());
1570  }
1571 }
1572 
1573 /*
1574  *
1575  */
1576 void
1578  const std::vector<uint64_t> &handles)
1579 {
1580  int ret;
1581  DbtManaged keym;
1582  String key_str;
1583  // 4 bytes for #handles, 8 bytes/handle
1584  uint32_t serialized_len = handles.size() * (sizeof(uint64_t)) + sizeof(uint32_t);
1585  DynamicBuffer buf(serialized_len);
1586 
1587  HT_DEBUG_OUT <<"set_event_notification_handles txn="<< txn <<" event id="<< id
1588  << " num notification handles=" << handles.size() << HT_END;
1589 
1590  // Serialize handles vector
1591  Serialization::encode_i32(&buf.ptr, handles.size());
1592  for(uint32_t ii=0; ii< handles.size(); ++ii)
1593  Serialization::encode_i64(&buf.ptr, handles[ii]);
1594 
1595  Dbt data((void *)buf.base, serialized_len);
1596 
1597  try {
1598  HT_ASSERT(event_exists(txn, id));
1599 
1600  // Write event notification handles
1602  keym.set_str(key_str);
1603  // Write only if this key doesnt exist in the db
1604  ret = txn.handle_state_db->put(txn.db_txn, &keym, &data, DB_NOOVERWRITE);
1605  HT_ASSERT(ret == 0);
1606 
1607  }
1608  catch (DbException &e) {
1609  if (e.get_errno() == DB_LOCK_DEADLOCK)
1611  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1613  HT_ERRORF("Berkeley DB error: %s", e.what());
1615  }
1616 }
1617 
1618 /*
1619  *
1620  */
1621 void
1623 {
1624  int ret;
1625  DbtManaged keym, datam;
1626  String key_str;
1627  char numbuf[16];
1628  Dbc *cursorp = 0;
1629  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1630 
1631  HT_DEBUG_OUT <<"delete_event txn="<< txn <<" event id="<< id << HT_END;
1632  try {
1633  if (!event_exists(txn, id))
1634  return;
1635 
1636  // Delete id under "/EVENTS/"
1637  String events_dir = EVENTS_STR;
1638  keym.set_str(events_dir);
1639  sprintf(numbuf, "%llu", (Llu)id);
1640  datam.set_str(numbuf);
1641 
1642  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1643  cursorp->get(&keym, &datam, DB_GET_BOTH);
1644  cursorp->del(0);
1645 
1646  // Delete event type
1647  key_str = get_event_key(id, EVENT_TYPE);
1648  keym.set_str(key_str);
1649 
1650  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
1651  HT_ASSERT(ret == 0);
1652 
1653  // Delete event mask
1654  key_str = get_event_key(id, EVENT_MASK);
1655  keym.set_str(key_str);
1656 
1657  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
1658  HT_ASSERT(ret == 0);
1659 
1660  // Delete event name
1661  key_str = get_event_key(id, EVENT_NAME);
1662  keym.set_str(key_str);
1663 
1664  if ((ret = txn.handle_state_db->del(txn.db_txn, &keym, 0)) == DB_NOTFOUND)
1665  HT_DEBUG_OUT <<"txn="<< txn <<" event key " << keym.get_str()
1666  << "not found in DB" << HT_END;
1667 
1668  // Delete event mode
1669  key_str = get_event_key(id, EVENT_MODE);
1670  keym.set_str(key_str);
1671 
1672  if ((ret = txn.handle_state_db->del(txn.db_txn, &keym, 0)) == DB_NOTFOUND)
1673  HT_DEBUG_OUT <<"txn="<< txn <<" event key " << keym.get_str()
1674  << " not found in DB" << HT_END;
1675 
1676  // Delete event generation
1677  key_str = get_event_key(id, EVENT_GENERATION);
1678  keym.set_str(key_str);
1679 
1680  if ((ret = txn.handle_state_db->del(txn.db_txn, &keym, 0)) == DB_NOTFOUND)
1681  HT_DEBUG_OUT <<"txn="<< txn <<" event key " << keym.get_str()
1682  << " not found in DB" << HT_END;
1683 
1684  // Delete event notification handles
1686  keym.set_str(key_str);
1687 
1688  if ((ret = txn.handle_state_db->del(txn.db_txn, &keym, 0)) == DB_NOTFOUND)
1689  HT_DEBUG_OUT <<"txn="<< txn <<" event key " << keym.get_str()
1690  << " not found in DB" << HT_END;
1691  }
1692  catch (DbException &e) {
1693  if (e.get_errno() == DB_LOCK_DEADLOCK)
1695  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1697  HT_ERRORF("Berkeley DB error: %s", e.what());
1699  }
1700 }
1701 
1702 /*
1703  *
1704  */
1705 bool
1707 {
1708  DbtManaged keym, datam;
1709  Dbc *cursorp = 0;
1710  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1711  bool exists = true;
1712  char numbuf[16];
1713 
1714  HT_DEBUG_OUT <<"event_exists txn="<< txn << "event id=" << id << HT_END;
1715 
1716  try {
1717  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1718 
1719  // Check for id under "/EVENTS/"
1720  String events_dir = EVENTS_STR;
1721  keym.set_str(events_dir);
1722  sprintf(numbuf, "%llu", (Llu)id);
1723  datam.set_str(numbuf);
1724 
1725  if(cursorp->get(&keym, &datam, DB_GET_BOTH) == DB_NOTFOUND) {
1726  exists = false;
1727  }
1728  }
1729  catch (DbException &e) {
1730  if (e.get_errno() == DB_LOCK_DEADLOCK)
1732  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1734  HT_ERRORF("Berkeley DB error: %s", e.what());
1736  }
1737 
1738  if(exists)
1739  HT_DEBUG_OUT <<"event id '"<< id <<"' exists."<< HT_END;
1740 
1741  return exists;
1742 }
1743 
1744 /*
1745  *
1746  */
1747 void
1748 BerkeleyDbFilesystem::create_session(BDbTxn &txn, uint64_t id, const String& addr)
1749 
1750 {
1751  int ret;
1752  DbtManaged keym, datam;
1753  String key_str;
1754  String expbuf;
1755  char numbuf[16];
1756  Dbc *cursorp = 0;
1757  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1758 
1759  HT_DEBUG_OUT <<"create_session txn="<< txn <<" create session addr='"<< addr
1760  << " id="<< id << HT_END;
1761  try {
1762 
1763  HT_ASSERT(!session_exists(txn, id));
1764  // Store id under "/SESSIONS/"
1765  String sessions_dir = SESSIONS_STR;
1766  keym.set_str(sessions_dir);
1767  sprintf(numbuf, "%llu", (Llu)id);
1768  datam.set_str(numbuf);
1769 
1770  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1771  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
1772  HT_ASSERT(ret == 0);
1773 
1774  // Store session addr
1775  key_str = get_session_key(id, SESSION_ADDR);
1776  keym.set_str(key_str);
1777  datam.set_str(addr);
1778 
1779  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1780  HT_ASSERT(ret == 0);
1781 
1782  // Store session expired status
1783  key_str = get_session_key(id, SESSION_EXPIRED);
1784  keym.set_str(key_str);
1785  datam.set_str((String)"0");
1786 
1787  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
1788  HT_ASSERT(ret == 0);
1789  }
1790  catch (DbException &e) {
1791  if (e.get_errno() == DB_LOCK_DEADLOCK)
1793  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1795  HT_ERRORF("Berkeley DB error: %s", e.what());
1797  }
1798 
1799  HT_DEBUG_OUT << "exitting create_session txn="<< txn
1800  <<" create session addr='"<< addr << " id="<< id << HT_END;
1801 
1802 }
1803 
1804 /*
1805  *
1806  */
1807 void
1809 {
1810  int ret;
1811  DbtManaged keym, datam;
1812  String key_str;
1813  char numbuf[16];
1814  Dbc *cursorp = 0;
1815  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1816 
1817  HT_DEBUG_OUT <<"delete_session txn="<< txn <<" session id="<< id << HT_END;
1818  try {
1819 
1820  // Delete session expired status
1821  key_str = get_session_key(id, SESSION_EXPIRED);
1822  keym.set_str(key_str);
1823 
1824  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
1825  HT_ASSERT(ret==0);
1826 
1827  // Delete session handles
1828  String session_handles_dir = get_session_key(id, SESSION_HANDLES);
1829  keym.set_str(session_handles_dir);
1830  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1831 
1832  ret = cursorp->get(&keym, &datam, DB_SET);
1833  while(ret != DB_NOTFOUND) {
1834  HT_ASSERT(ret==0);
1835  cursorp->del(0);
1836  ret = cursorp->get(&keym, &datam, DB_NEXT_DUP);
1837  }
1838 
1839  // Delete session name is exists
1840  key_str = get_session_key(id, SESSION_NAME);
1841  keym.set_str(key_str);
1842 
1843  ret = cursorp->get(&keym, &datam, DB_SET);
1844  if (ret != DB_NOTFOUND) {
1845  ret = cursorp->del(0);
1846  }
1847 
1848  // Delete session addr
1849  key_str = get_session_key(id, SESSION_ADDR);
1850  keym.set_str(key_str);
1851 
1852  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
1853  HT_ASSERT(ret==0);
1854 
1855  // Delete id under "/SESSIONS/"
1856  String sessions_dir = SESSIONS_STR;
1857  keym.set_str(sessions_dir);
1858  sprintf(numbuf, "%llu", (Llu)id);
1859  datam.set_str(numbuf);
1860 
1861  cursorp->get(&keym, &datam, DB_GET_BOTH);
1862  ret = cursorp->del(0);
1863  HT_ASSERT(ret==0);
1864  }
1865  catch (DbException &e) {
1866  if (e.get_errno() == DB_LOCK_DEADLOCK)
1868  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1870  HT_ERRORF("Berkeley DB error: %s", e.what());
1872  }
1873  HT_DEBUG_OUT << "exitting delete_session txn=" << txn << " session id=" << id << HT_END;
1874 }
1875 
1876 /*
1877  *
1878  */
1879 void
1881 {
1882  int ret;
1883  DbtManaged keym, datam;
1884  String key_str;
1885  Dbc *cursorp = 0;
1886  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1887 
1888  HT_DEBUG_OUT <<"expire_session txn="<< txn <<" session id="<< id << HT_END;
1889 
1890  try {
1891  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1892  // Set session expired status
1893  key_str = get_session_key(id, SESSION_EXPIRED);
1894  keym.set_str(key_str);
1895 
1896  ret = cursorp->get(&keym, &datam, DB_SET);
1897  HT_ASSERT(ret == 0);
1898  datam.set_str((String)"1");
1899 
1900  ret = cursorp->put(&keym, &datam, DB_CURRENT);
1901  HT_ASSERT(ret==0);
1902  }
1903  catch (DbException &e) {
1904  if (e.get_errno() == DB_LOCK_DEADLOCK)
1906  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1908  HT_ERRORF("Berkeley DB error: %s", e.what());
1910  }
1911 
1912  HT_DEBUG_OUT << "exitting expire_session txn="<< txn <<" session id=" << id << HT_END;
1913 
1914 }
1915 
1916 /*
1917  *
1918  */
1919 void
1920 BerkeleyDbFilesystem::add_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
1921 {
1922  int ret;
1923  DbtManaged keym, datam;
1924  String key_str;
1925  char numbuf[17];
1926  Dbc *cursorp = 0;
1927  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1928 
1929  HT_DEBUG_OUT << "add_session_handle txn="<< txn <<" session id="<< id
1930  << " handle id=" << handle_id << HT_END;
1931  try {
1932  HT_ASSERT(session_exists(txn, id));
1933 
1934  String session_handles_dir = get_session_key(id, SESSION_HANDLES);
1935  keym.set_str(session_handles_dir);
1936  sprintf(numbuf, "%llu", (Llu)handle_id);
1937  datam.set_str(numbuf);
1938 
1939  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1940  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
1941  HT_ASSERT(ret == 0);
1942  }
1943  catch (DbException &e) {
1944  if (e.get_errno() == DB_LOCK_DEADLOCK)
1946  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1948  HT_ERRORF("Berkeley DB error: %s", e.what());
1950  }
1951  HT_DEBUG_OUT <<"exitting add_session_handle txn="<< txn <<" session id="<< id
1952  << " handle id=" << handle_id << HT_END;
1953 }
1954 
1955 /*
1956  *
1957  */
1958 void
1959 BerkeleyDbFilesystem::get_session_handles(BDbTxn &txn, uint64_t id, std::vector<uint64_t> &handles)
1960 {
1961  int ret;
1962  DbtManaged keym, datam;
1963  String key_str;
1964  Dbc *cursorp = 0;
1965  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
1966 
1967  HT_DEBUG_OUT <<"get_session_handles txn="<< txn <<" session id="<< id << HT_END;
1968 
1969  try {
1970  HT_ASSERT(session_exists(txn, id));
1971 
1972  String session_handles_dir = get_session_key(id, SESSION_HANDLES);
1973  keym.set_str(session_handles_dir);
1974  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
1975 
1976  ret = cursorp->get(&keym, &datam, DB_SET);
1977  while(ret != DB_NOTFOUND) {
1978  handles.push_back(strtoul((const char *)datam.get_data(), 0, 0));
1979  ret = cursorp->get(&keym, &datam, DB_NEXT_DUP);
1980  }
1981  }
1982  catch (DbException &e) {
1983  if (e.get_errno() == DB_LOCK_DEADLOCK)
1985  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
1987  HT_ERRORF("Berkeley DB error: %s", e.what());
1989  }
1990 
1991  HT_DEBUG_OUT <<"exitting get_session_handles txn="<< txn <<" session id="<< id << HT_END;
1992 }
1993 
1994 /*
1995  *
1996  */
1997 bool
1998 BerkeleyDbFilesystem::delete_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
1999 {
2000  int ret;
2001  DbtManaged keym, datam;
2002  String key_str;
2003  char numbuf[17];
2004  bool deleted = false;
2005  Dbc *cursorp = 0;
2006  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2007 
2008  HT_DEBUG_OUT <<"delete_session_handle txn="<< txn <<" session id="<< id
2009  << " handle_id=" << handle_id << HT_END;
2010  try {
2011  HT_ASSERT(session_exists(txn, id));
2012 
2013  // Delete session handle
2014  String session_handles_dir = get_session_key(id, SESSION_HANDLES);
2015  keym.set_str(session_handles_dir);
2016  sprintf(numbuf, "%llu", (Llu)handle_id);
2017  datam.set_str(numbuf);
2018  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2019  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
2020 
2021  HT_EXPECT(ret == 0 || ret == DB_NOTFOUND, HYPERSPACE_STATEDB_ERROR);
2022 
2023  if (ret != DB_NOTFOUND) {
2024  ret = cursorp->del(0);
2025  HT_ASSERT(ret == 0);
2026  deleted = true;
2027  }
2028  }
2029  catch (DbException &e) {
2030  if (e.get_errno() == DB_LOCK_DEADLOCK)
2032  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2034  HT_ERRORF("Berkeley DB error: %s", e.what());
2036  }
2037 
2038  HT_DEBUG_OUT <<"exitting delete_session_handle txn="<< txn <<" session id="<< id
2039  << " handle_id=" << handle_id << " deleted=" << deleted << HT_END;
2040  return deleted;
2041 }
2042 
2043 /*
2044  *
2045  */
2046 bool
2048 {
2049  DbtManaged keym, datam;
2050  Dbc *cursorp = 0;
2051  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2052  bool exists = true;
2053  char numbuf[16];
2054 
2055  HT_DEBUG_OUT <<"session_exists txn="<< txn << " session id=" << id << HT_END;
2056 
2057  try {
2058  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2059 
2060  // Check for id under "/SESSIONS/"
2061  String sessions_dir = SESSIONS_STR;
2062  keym.set_str(sessions_dir);
2063  sprintf(numbuf, "%llu", (Llu)id);
2064  datam.set_str(numbuf);
2065 
2066  if(cursorp->get(&keym, &datam, DB_GET_BOTH) == DB_NOTFOUND) {
2067  exists = false;
2068  }
2069  }
2070  catch (DbException &e) {
2071  if (e.get_errno() == DB_LOCK_DEADLOCK)
2073  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2075  HT_ERRORF("Berkeley DB error: %s", e.what());
2077  }
2078 
2079  if(exists)
2080  HT_DEBUG_OUT <<"session id '"<< id <<"' exists."<< HT_END;
2081 
2082  HT_DEBUG_OUT <<"exitting session_exists txn="<< txn << " session id=" << id << HT_END;
2083  return exists;
2084 }
2085 
2086 /*
2087  *
2088  */
2089 void
2091 {
2092  int ret;
2093  DbtManaged keym, datam;
2094  String key_str;
2095  Dbc *cursorp = 0;
2096  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2097 
2098  HT_DEBUG_OUT <<"set_session_name txn="<< txn <<" name='"<< name << "' id="<< id << HT_END;
2099  try {
2100 
2101  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2102  // Store session name/replace if exists
2103  key_str = get_session_key(id, SESSION_NAME);
2104  keym.set_str(key_str);
2105  ret = cursorp->get(&keym, &datam, DB_SET);
2106  datam.set_str(name);
2107  if (ret == DB_NOTFOUND)
2108  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2109  else
2110  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2111 
2112  HT_ASSERT(ret==0);
2113  }
2114  catch (DbException &e) {
2115  if (e.get_errno() == DB_LOCK_DEADLOCK)
2117  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2119  HT_ERRORF("Berkeley DB error: %s", e.what());
2121  }
2122 
2123  HT_DEBUG_OUT <<"exitting set_session_name txn="<< txn <<" name='"<< name << "'" << HT_END;
2124 }
2125 
2126 /*
2127  *
2128  */
2129 String
2131 {
2132  int ret;
2133  DbtManaged keym, datam;
2134  String key_str, name;
2135  Dbc *cursorp = 0;
2136  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2137 
2138  HT_DEBUG_OUT <<"get_session_name txn="<< txn <<" session id="<< id << HT_END;
2139 
2140  try {
2141  // Throw exception if session does not exist
2143 
2145  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2146 
2147  ret = cursorp->get(&keym, &datam, DB_SET);
2148 
2149  HT_ASSERT(ret == 0 || ret == DB_NOTFOUND);
2150 
2151  if (ret == DB_NOTFOUND)
2152  name = "";
2153  else
2154  name = datam.get_str();
2155  }
2156  catch (DbException &e) {
2157  if (e.get_errno() == DB_LOCK_DEADLOCK)
2159  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2161  HT_ERRORF("Berkeley DB error: %s", e.what());
2163  }
2164 
2165  HT_DEBUG_OUT <<"exitting get_session_name txn="<< txn
2166  <<" session id=" << id << " name="<< name << HT_END;
2167  return name;
2168 }
2169 
2170 /*
2171  *
2172  */
2173 void
2175  uint32_t open_flags, uint32_t event_mask, uint64_t session_id,
2176  bool locked, uint32_t del_state)
2177 {
2178  int ret;
2179  DbtManaged keym, datam;
2180  String key_str;
2181  char numbuf[17];
2182  String buf;
2183  Dbc *cursorp = 0;
2184  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2185 
2186  HT_DEBUG_OUT <<"create_handle txn="<< txn <<" id="<< id << " node='"<< node_name
2187  << "' open_flags=" << open_flags << " event_mask="<< event_mask
2188  << " session_id=" << session_id << " locked=" << locked
2189  << " del_state=" << del_state << HT_END;
2190  try {
2191 
2192  HT_ASSERT(!handle_exists(txn, id));
2193  // Store id under "/HANDLES/"
2194  String handles_dir = HANDLES_STR;
2195  keym.set_str(handles_dir);
2196  sprintf(numbuf, "%llu", (Llu)id);
2197  datam.set_str(numbuf);
2198 
2199  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2200  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
2201  HT_ASSERT(ret == 0);
2202 
2203  // Store handle node name
2204  key_str = get_handle_key(id, HANDLE_NODE_NAME);
2205  keym.set_str(key_str);
2206  datam.set_str(node_name);
2207 
2208  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2209  HT_ASSERT(ret == 0);
2210 
2211  // Store handle open_flags
2212  key_str = get_handle_key(id, HANDLE_OPEN_FLAGS);
2213  keym.set_str(key_str);
2214  sprintf(numbuf, "%lu", (Lu)open_flags);
2215  datam.set_str(numbuf);
2216 
2217  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2218  HT_ASSERT(ret == 0);
2219 
2220  // Store handle deletion state
2221  key_str = get_handle_key(id, HANDLE_DEL_STATE);
2222  keym.set_str(key_str);
2223  sprintf(numbuf, "%lu", (Lu)del_state);
2224  datam.set_str(numbuf);
2225 
2226  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2227  HT_ASSERT(ret == 0);
2228 
2229  // Store handle notification event_mask
2230  key_str = get_handle_key(id, HANDLE_EVENT_MASK);
2231  keym.set_str(key_str);
2232  sprintf(numbuf, "%lu", (Lu)event_mask);
2233  datam.set_str(numbuf);
2234 
2235  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2236  HT_ASSERT(ret == 0);
2237 
2238  // Store handle session id
2239  key_str = get_handle_key(id, HANDLE_SESSION_ID);
2240  keym.set_str(key_str);
2241  sprintf(numbuf, "%llu", (Llu)session_id);
2242  datam.set_str(numbuf);
2243 
2244  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2245  HT_ASSERT(ret == 0);
2246 
2247  // Store handle locked bool
2248  key_str = get_handle_key(id, HANDLE_LOCKED);
2249  keym.set_str(key_str);
2250  if(locked)
2251  buf = "1";
2252  else
2253  buf = "0";
2254  datam.set_str(buf);
2255 
2256  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2257  HT_ASSERT(ret == 0);
2258  }
2259  catch (DbException &e) {
2260  if (e.get_errno() == DB_LOCK_DEADLOCK)
2262  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2264  HT_ERRORF("Berkeley DB error: %s", e.what());
2266  }
2267 
2268  HT_DEBUG_OUT <<"exitting create_handle txn="<< txn <<" id="<< id << " node='"<< node_name
2269  << "' open_flags=" << open_flags << " event_mask="<< event_mask
2270  << " session_id=" << session_id << " locked=" << locked
2271  << " del_state=" << del_state << HT_END;
2272 }
2273 
2274 /*
2275  *
2276  */
2277 void
2279 {
2280  int ret;
2281  DbtManaged keym, datam;
2282  String key_str;
2283  char numbuf[16];
2284  Dbc *cursorp = 0;
2285  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2286 
2287  HT_DEBUG_OUT <<"delete_handle txn="<< txn <<" handle id="<< id << HT_END;
2288  try {
2289  if (!handle_exists(txn, id))
2290  return;
2291  // Delete handle locked bool
2292  key_str = get_handle_key(id, HANDLE_LOCKED);
2293  keym.set_str(key_str);
2294 
2295  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2296  HT_ASSERT(ret == 0);
2297 
2298  // Delete handle session id
2299  key_str = get_handle_key(id, HANDLE_SESSION_ID);
2300  keym.set_str(key_str);
2301 
2302  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2303  HT_ASSERT(ret == 0);
2304 
2305  // Delete handle notification event_mask
2306  key_str = get_handle_key(id, HANDLE_EVENT_MASK);
2307  keym.set_str(key_str);
2308 
2309  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2310  HT_ASSERT(ret == 0);
2311 
2312  // Delete handle open_flags
2313  key_str = get_handle_key(id, HANDLE_OPEN_FLAGS);
2314  keym.set_str(key_str);
2315 
2316  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2317  HT_ASSERT(ret == 0);
2318 
2319  // Delete handle deletion state
2320  key_str = get_handle_key(id, HANDLE_DEL_STATE);
2321  keym.set_str(key_str);
2322 
2323  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2324  HT_ASSERT(ret == 0);
2325 
2326  // Delete handle node name
2327  key_str = get_handle_key(id, HANDLE_NODE_NAME);
2328  keym.set_str(key_str);
2329 
2330  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
2331  HT_ASSERT(ret == 0);
2332 
2333  // Delete id under "/HANDLES/"
2334  String handles_dir = HANDLES_STR;
2335  keym.set_str(handles_dir);
2336  sprintf(numbuf, "%llu", (Llu)id);
2337  datam.set_str(numbuf);
2338 
2339  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2340  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
2341  HT_ASSERT(ret==0);
2342  ret = cursorp->del(0);
2343  HT_ASSERT(ret==0);
2344 
2345  }
2346  catch (DbException &e) {
2347  if (e.get_errno() == DB_LOCK_DEADLOCK)
2349  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2351  HT_ERRORF("Berkeley DB error: %s", e.what());
2353  }
2354  HT_DEBUG_OUT << "exitting delete_handle txn="<< txn <<" handle id="
2355  << id << HT_END;
2356 
2357 }
2358 
2359 /*
2360  *
2361  */
2362 void
2363 BerkeleyDbFilesystem::set_handle_del_state(BDbTxn &txn, uint64_t id, uint32_t del_state)
2364 {
2365  DbtManaged keym, datam;
2366  char numbuf[17];
2367  int ret;
2368  Dbc *cursorp = 0;
2369  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2370  String key_str;
2371 
2372  HT_DEBUG_OUT <<"set_handle_del_state txn="<< txn <<" handle id="
2373  << id << " del_state=" << del_state << HT_END;
2374  try {
2375  HT_ASSERT(handle_exists(txn, id));
2376  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2377 
2378  // Replace existing open flag
2379  key_str = get_handle_key(id, HANDLE_DEL_STATE);
2380  keym.set_str(key_str);
2381 
2382  ret = cursorp->get(&keym, &datam, DB_SET);
2383  HT_ASSERT(ret == 0);
2384 
2385  sprintf(numbuf, "%lu", (Lu)del_state);
2386  datam.set_str(numbuf);
2387 
2388  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2389  HT_ASSERT(ret==0);
2390  }
2391  catch (DbException &e) {
2392  HT_ERRORF("Berkeley DB error: %s", e.what());
2393  if (e.get_errno() == DB_LOCK_DEADLOCK)
2395  else
2397  }
2398 
2399  HT_DEBUG_OUT <<"exitting set_handle_del_state txn="<< txn <<" handle id="
2400  << id << " del_state=" << del_state << HT_END;
2401 
2402 }
2403 
2404 /*
2405  *
2406  */
2407 void
2408 BerkeleyDbFilesystem::set_handle_open_flags(BDbTxn &txn, uint64_t id, uint32_t open_flags)
2409 {
2410  DbtManaged keym, datam;
2411  char numbuf[17];
2412  int ret;
2413  Dbc *cursorp = 0;
2414  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2415  String key_str;
2416 
2417  HT_DEBUG_OUT <<"set_handle_open_flags txn="<< txn <<" handle id="
2418  << id << " open_flags=" << open_flags << HT_END;
2419  try {
2421  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2422 
2423  // Replace existing open flag
2424  key_str = get_handle_key(id, HANDLE_OPEN_FLAGS);
2425  keym.set_str(key_str);
2426 
2427  ret = cursorp->get(&keym, &datam, DB_SET);
2428  HT_ASSERT(ret == 0);
2429 
2430  sprintf(numbuf, "%lu", (Lu)open_flags);
2431  datam.set_str(numbuf);
2432 
2433  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2434  HT_ASSERT(ret==0);
2435  }
2436  catch (DbException &e) {
2437  if (e.get_errno() == DB_LOCK_DEADLOCK)
2439  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2441  HT_ERRORF("Berkeley DB error: %s", e.what());
2443  }
2444 
2445  HT_DEBUG_OUT <<"exitting set_handle_open_flags txn="<< txn <<" handle id="
2446  << id << " open_flags=" << open_flags << HT_END;
2447 
2448 }
2449 
2450 /*
2451  *
2452  */
2453 void
2454 BerkeleyDbFilesystem::set_handle_event_mask(BDbTxn &txn, uint64_t id, uint32_t event_mask)
2455 {
2456  DbtManaged keym, datam;
2457  char numbuf[17];
2458  int ret;
2459  Dbc *cursorp = 0;
2460  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2461  String key_str;
2462 
2463  HT_DEBUG_OUT <<"set_handle_event_mask txn="<< txn <<" handle id="
2464  << id << " event_mask=" << event_mask << HT_END;
2465  try {
2467  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2468 
2469  // Replace existing event_mask
2470  key_str = get_handle_key(id, HANDLE_EVENT_MASK);
2471  keym.set_str(key_str);
2472 
2473  ret = cursorp->get(&keym, &datam, DB_SET);
2474  HT_ASSERT(ret == 0);
2475 
2476  sprintf(numbuf, "%lu", (Lu)event_mask);
2477  datam.set_str(numbuf);
2478 
2479  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2480  HT_ASSERT(ret==0);
2481  }
2482  catch (DbException &e) {
2483  if (e.get_errno() == DB_LOCK_DEADLOCK)
2485  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2487  HT_ERRORF("Berkeley DB error: %s", e.what());
2489  }
2490 
2491  HT_DEBUG_OUT <<"exitting set_handle_event_mask txn="<< txn <<" handle id="
2492  << id << " event_mask=" << event_mask << HT_END;
2493 
2494 }
2495 
2496 /*
2497  *
2498  */
2499 uint32_t
2501 {
2502  DbtManaged keym, datam;
2503  int ret;
2504  Dbc *cursorp = 0;
2505  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2506  String key_str;
2507  uint32_t event_mask;
2508 
2509  HT_DEBUG_OUT <<"get_handle_event_mask txn="<< txn <<" handle id=" << id << HT_END;
2510 
2511  try {
2512  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2513 
2514  // Replace existing event_mask
2515  key_str = get_handle_key(id, HANDLE_EVENT_MASK);
2516  keym.set_str(key_str);
2517 
2518  ret = cursorp->get(&keym, &datam, DB_SET);
2519  HT_ASSERT(ret == 0);
2520  event_mask = (uint32_t)strtoull(datam.get_str(), 0, 0);
2521  }
2522  catch (DbException &e) {
2523  if (e.get_errno() == DB_LOCK_DEADLOCK)
2525  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2527  HT_ERRORF("Berkeley DB error: %s", e.what());
2529  }
2530 
2531  HT_DEBUG_OUT <<"exitting get_handle_event_mask txn="<< txn <<" handle id=" << id
2532  << " event_mask=" << event_mask << HT_END;
2533 
2534  return event_mask;
2535 }
2536 
2537 /*
2538  *
2539  */
2540 void
2541 BerkeleyDbFilesystem::set_handle_locked(BDbTxn &txn, uint64_t id, bool locked)
2542 {
2543  DbtManaged keym, datam;
2544  int ret;
2545  String buf;
2546  Dbc *cursorp = 0;
2547  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2548  String key_str;
2549 
2550  HT_DEBUG_OUT <<"set_handle_locked txn="<< txn <<" handle id=" << id
2551  << " locked=" << locked << HT_END;
2552  try {
2553  HT_ASSERT(handle_exists(txn, id));
2554  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2555 
2556  // Replace existing lockedness
2557  key_str = get_handle_key(id, HANDLE_LOCKED);
2558  keym.set_str(key_str);
2559  ret = cursorp->get(&keym, &datam, DB_SET);
2560  HT_ASSERT(ret == 0);
2561  if(locked)
2562  buf = "1";
2563  else
2564  buf = "0";
2565  datam.set_str(buf);
2566 
2567  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2568  HT_ASSERT(ret==0);
2569  }
2570  catch (DbException &e) {
2571  if (e.get_errno() == DB_LOCK_DEADLOCK)
2573  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2575  HT_ERRORF("Berkeley DB error: %s", e.what());
2577  }
2578  HT_DEBUG_OUT <<"exitting set_handle_locked txn="<< txn <<" handle id=" << id
2579  << " locked=" << locked << HT_END;
2580 }
2581 
2582 /*
2583  *
2584  */
2585 bool
2587 {
2588  DbtManaged keym, datam;
2589  Dbc *cursorp = 0;
2590  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2591  bool exists = true;
2592  char numbuf[17];
2593 
2594  HT_DEBUG_OUT <<"handle_exists txn="<< txn << " handle id=" << id << HT_END;
2595 
2596  try {
2597  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2598 
2599  // Check for id under "/HANDLES/"
2600  String handles_dir = HANDLES_STR;
2601  keym.set_str(handles_dir);
2602  sprintf(numbuf, "%llu", (Llu)id);
2603  datam.set_str(numbuf);
2604 
2605  if(cursorp->get(&keym, &datam, DB_GET_BOTH) == DB_NOTFOUND) {
2606  exists = false;
2607  }
2608  }
2609  catch (DbException &e) {
2610  if (e.get_errno() == DB_LOCK_DEADLOCK)
2612  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2614  HT_ERRORF("Berkeley DB error: %s", e.what());
2616  }
2617 
2618  HT_DEBUG_OUT << "exitting handle_exists txn="<< txn << " handle id=" << id
2619  << " exists=" << exists << HT_END;
2620  return exists;
2621 }
2622 
2623 /*
2624  *
2625  */
2626 
2627 bool
2629 {
2630  DbtManaged keym, datam;
2631  int ret;
2632  String buf;
2633  bool locked;
2634  String key_str;
2635 
2636  HT_DEBUG_OUT << "handle_is_locked txn=" << txn << " handle id=" << id << HT_END;
2637 
2638  try {
2639  HT_ASSERT(handle_exists(txn, id));
2640 
2641  // Get locked-ness
2642  key_str = get_handle_key(id, HANDLE_LOCKED);
2643  keym.set_str(key_str);
2644  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
2645  HT_ASSERT(ret == 0);
2646 
2647  buf = datam.get_str();
2648 
2649  if (buf == "1")
2650  locked = true;
2651  else if (buf == "0")
2652  locked = false;
2653  else {
2654  HT_ASSERT(false);
2655  }
2656 
2657  }
2658  catch (DbException &e) {
2659  if (e.get_errno() == DB_LOCK_DEADLOCK)
2661  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2663  HT_ERRORF("Berkeley DB error: %s", e.what());
2665  }
2666 
2667  HT_DEBUG_OUT <<"exitting set_handle_locked txn="<< txn <<" handle id=" << id
2668  << " locked=" << locked << HT_END;
2669  return locked;
2670 }
2671 
2672 /*
2673  *
2674  */
2675 void
2677 {
2678  DbtManaged keym, datam;
2679  int ret;
2680  String key_str;
2681 
2682  HT_DEBUG_OUT <<"get_handle_node_name txn="<< txn <<" handle id=" << id << HT_END;
2683 
2684  try {
2685  HT_ASSERT(handle_exists(txn, id));
2686 
2687  // Get existing node_name
2688  key_str = get_handle_key(id, HANDLE_NODE_NAME);
2689  keym.set_str(key_str);
2690 
2691  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
2692  HT_ASSERT(ret == 0);
2693  node_name = datam.get_str();
2694 
2695  }
2696  catch (DbException &e) {
2697  if (e.get_errno() == DB_LOCK_DEADLOCK)
2699  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2701  HT_ERRORF("Berkeley DB error: %s", e.what());
2703  }
2704 
2705  HT_DEBUG_OUT <<"exitting get_handle_node_name txn="<< txn <<" handle id=" << id
2706  <<" node_name=" << node_name << HT_END;
2707 }
2708 
2709 /*
2710  *
2711  */
2712 uint32_t
2714 {
2715  DbtManaged keym, datam;
2716  int ret;
2717  String key_str;
2718  int del_state;
2719 
2720  HT_DEBUG_OUT <<"get_handle_del_state txn="<< txn <<" handle id=" << id << HT_END;
2721 
2722  try {
2723  HT_ASSERT(handle_exists(txn, id));
2724 
2725  // Get existing deletion state
2726  key_str = get_handle_key(id, HANDLE_DEL_STATE);
2727  keym.set_str(key_str);
2728 
2729  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
2730  HT_ASSERT(ret == 0);
2731 
2732  del_state = (uint32_t)strtoul(datam.get_str(), 0, 0);
2733  }
2734  catch (DbException &e) {
2735  HT_ERRORF("Berkeley DB error: %s", e.what());
2736  if (e.get_errno() == DB_LOCK_DEADLOCK)
2738  else
2740  }
2741 
2742  HT_DEBUG_OUT <<"exitting get_handle_del_state txn="<< txn <<" handle id="
2743  << id << " del_state=" << del_state << HT_END;
2744  return del_state;
2745 }
2746 
2747 /*
2748  *
2749  */
2750 uint32_t
2752 {
2753  DbtManaged keym, datam;
2754  int ret;
2755  String key_str;
2756  uint32_t open_flags;
2757 
2758  HT_DEBUG_OUT <<"get_handle_open_flags txn="<< txn <<" handle id=" << id << HT_END;
2759 
2760  try {
2761  HT_ASSERT(handle_exists(txn, id));
2762 
2763  // Get existing open flag
2764  key_str = get_handle_key(id, HANDLE_OPEN_FLAGS);
2765  keym.set_str(key_str);
2766 
2767  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
2768  HT_ASSERT(ret == 0);
2769 
2770  open_flags = (uint32_t)strtoul(datam.get_str(), 0, 0);
2771 
2772  }
2773  catch (DbException &e) {
2774  if (e.get_errno() == DB_LOCK_DEADLOCK)
2776  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2778  HT_ERRORF("Berkeley DB error: %s", e.what());
2780  }
2781 
2782  HT_DEBUG_OUT <<"exitting get_handle_open_flags txn="<< txn <<" handle id="
2783  << id << " open_flags=" << open_flags << HT_END;
2784  return open_flags;
2785 }
2786 
2787 /*
2788  *
2789  */
2790 uint64_t
2792 {
2793  int ret;
2794  DbtManaged keym, datam;
2795  String key_str;
2796  uint64_t session_id;
2797 
2798  HT_DEBUG_OUT <<"get_handle_session txn="<< txn <<" id="<< id << HT_END;
2799 
2800  try {
2801 
2802  HT_ASSERT(handle_exists(txn, id));
2803 
2804  // Get handle session id
2805  key_str = get_handle_key(id, HANDLE_SESSION_ID);
2806  keym.set_str(key_str);
2807 
2808  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
2809  HT_ASSERT(ret == 0);
2810 
2811  session_id = (uint64_t)strtoull(datam.get_str(), 0, 0);
2812  }
2813  catch (DbException &e) {
2814  if (e.get_errno() == DB_LOCK_DEADLOCK)
2816  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2818  HT_ERRORF("Berkeley DB error: %s", e.what());
2820  }
2821 
2822  HT_DEBUG_OUT <<"get_handle_session txn="<< txn <<" id="<< id << " session_id="
2823  << session_id << HT_END;
2824 
2825  return session_id;
2826 }
2827 
2828 
2829 /*
2830  *
2831  */
2832 void
2834  bool ephemeral, uint64_t lock_generation, uint32_t cur_lock_mode,
2835  uint64_t exclusive_handle)
2836 {
2837  int ret;
2838  DbtManaged keym, datam;
2839  String key_str;
2840  Dbc *cursorp = 0;
2841  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2842  String buf;
2843  char numbuf[17];
2844 
2845  HT_DEBUG_OUT <<"create_node txn="<< txn <<" create node ='"<< name
2846  << " ephemeral=" << ephemeral << " lock_mode=" << cur_lock_mode
2847  << " lock_generation=" << lock_generation << " exclusive_handle="
2848  << " exclusive_handle" << HT_END;
2849 
2850  try {
2851  HT_ASSERT(!node_exists(txn, name));
2852  // Store id under "/NODES/"
2853  String nodes_dir = NODES_STR;
2854  keym.set_str(nodes_dir);
2855  datam.set_str(name);
2856 
2857  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2858  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
2859  HT_ASSERT(ret == 0);
2860 
2861  // Store node ephemeral
2862  key_str = get_node_key(name, NODE_EPHEMERAL);
2863  keym.set_str(key_str);
2864  if(ephemeral)
2865  buf = "1";
2866  else
2867  buf = "0";
2868  datam.set_str(buf);
2869  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2870  HT_ASSERT(ret == 0);
2871 
2872 
2873  // Store node cur lock mode
2874  key_str = get_node_key(name, NODE_LOCK_MODE);
2875  keym.set_str(key_str);
2876  sprintf(numbuf, "%lu", (Lu)cur_lock_mode);
2877  datam.set_str(numbuf);
2878 
2879  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2880  HT_ASSERT(ret == 0);
2881 
2882  // Store node lock generation
2883  key_str = get_node_key(name, NODE_LOCK_GENERATION);
2884  keym.set_str(key_str);
2885  sprintf(numbuf, "%llu", (Llu)lock_generation);
2886  datam.set_str(numbuf);
2887 
2888  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2889  HT_ASSERT(ret == 0);
2890 
2891  // Store node exclusive_handle
2892  key_str = get_node_key(name, NODE_EXCLUSIVE_LOCK_HANDLE);
2893  keym.set_str(key_str);
2894  sprintf(numbuf, "%llu", (Llu)exclusive_handle);
2895  datam.set_str(numbuf);
2896 
2897  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
2898  HT_ASSERT(ret == 0);
2899 
2900  }
2901  catch (DbException &e) {
2902  if (e.get_errno() == DB_LOCK_DEADLOCK)
2904  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2906  HT_ERRORF("Berkeley DB error: %s", e.what());
2908  }
2909  HT_DEBUG_OUT << "exitting create_node txn="<< txn <<" create node ='"
2910  << name << "' ephemeral=" << ephemeral << " lock_mode="
2911  << cur_lock_mode
2912  << " lock_generation=" << lock_generation << " exclusive_handle="
2913  << " exclusive_handle" << HT_END;
2914 
2915 }
2916 
2917 /*
2918  *
2919  */
2920 void
2922  uint64_t lock_generation)
2923 {
2924  DbtManaged keym, datam;
2925  char numbuf[17];
2926  int ret;
2927  Dbc *cursorp = 0;
2928  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2929 
2930  String key_str;
2931 
2932  HT_DEBUG_OUT <<"set_node_lock_generation txn="<< txn <<" node=" << name
2933  <<" lock_generation=" << lock_generation << HT_END;
2934  try {
2935  HT_ASSERT(node_exists(txn, name));
2936  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2937 
2938  // Replace existing lock_generation
2939  key_str = get_node_key(name, NODE_LOCK_GENERATION);
2940  keym.set_str(key_str);
2941 
2942  ret = cursorp->get(&keym, &datam, DB_SET);
2943  HT_ASSERT(ret == 0);
2944 
2945  sprintf(numbuf, "%llu", (Llu)lock_generation);
2946  datam.set_str(numbuf);
2947 
2948  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2949  HT_ASSERT(ret==0);
2950  }
2951  catch (DbException &e) {
2952  if (e.get_errno() == DB_LOCK_DEADLOCK)
2954  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
2956  HT_ERRORF("Berkeley DB error: %s", e.what());
2958  }
2959 
2960  HT_DEBUG_OUT <<"exitting set_node_lock_generation txn="<< txn <<" node=" << name
2961  <<" lock_generation=" << lock_generation << HT_END;
2962 }
2963 
2964 /*
2965  *
2966  */
2967 uint64_t
2969 {
2970  DbtManaged keym, datam;
2971  char numbuf[17];
2972  int ret;
2973  uint64_t lock_generation=0;
2974  Dbc *cursorp = 0;
2975  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
2976 
2977  String key_str;
2978 
2979  HT_DEBUG_OUT <<"incr_node_lock_generation txn="<< txn <<" node=" << name << HT_END;
2980 
2981  try {
2982  HT_ASSERT(node_exists(txn, name));
2983  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
2984 
2985  // Replace existing lock_generation
2986  key_str = get_node_key(name, NODE_LOCK_GENERATION);
2987  keym.set_str(key_str);
2988 
2989  ret = cursorp->get(&keym, &datam, DB_SET);
2990  HT_ASSERT(ret == 0);
2991 
2992  lock_generation = (uint64_t)strtoull(datam.get_str(), 0, 0);
2993  ++lock_generation;
2994  sprintf(numbuf, "%llu", (Llu)lock_generation);
2995  datam.set_str(numbuf);
2996 
2997  ret = cursorp->put(&keym, &datam, DB_CURRENT);
2998  HT_ASSERT(ret==0);
2999  }
3000  catch (DbException &e) {
3001  if (e.get_errno() == DB_LOCK_DEADLOCK)
3003  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3005  HT_ERRORF("Berkeley DB error: %s", e.what());
3007  }
3008 
3009  HT_DEBUG_OUT <<"exitting incr_node_lock_generation txn="<< txn <<" node="<< name
3010  <<" lock_generation=" << lock_generation << HT_END;
3011 
3012  return lock_generation;
3013 }
3014 
3015 /*
3016  *
3017  */
3018 void
3020  bool ephemeral)
3021 {
3022  DbtManaged keym, datam;
3023  int ret;
3024  String buf;
3025  Dbc *cursorp = 0;
3026  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3027  String key_str;
3028 
3029  HT_DEBUG_OUT <<"set_node_ephemeral txn="<< txn <<" node_name=" << name
3030  <<" ephemeral=" << ephemeral << HT_END;
3031  try {
3032  HT_ASSERT(node_exists(txn, name));
3033  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3034 
3035  // Replace existing node_name
3036  key_str = get_node_key(name, NODE_EPHEMERAL);
3037  keym.set_str(key_str);
3038  ret = cursorp->get(&keym, &datam, DB_SET);
3039  HT_ASSERT(ret == 0);
3040  if(ephemeral)
3041  buf = "1";
3042  else
3043  buf = "0";
3044  datam.set_str(buf);
3045 
3046  ret = cursorp->put(&keym, &datam, DB_CURRENT);
3047  HT_ASSERT(ret==0);
3048  }
3049  catch (DbException &e) {
3050  if (e.get_errno() == DB_LOCK_DEADLOCK)
3052  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3054  HT_ERRORF("Berkeley DB error: %s", e.what());
3056  }
3057 
3058  HT_DEBUG_OUT <<"exitting set_node_ephemeral txn="<< txn <<" node_name=" << name
3059  <<" ephemeral=" << ephemeral << HT_END;
3060 }
3061 
3062 /*
3063  *
3064  */
3065 bool
3067 {
3068  DbtManaged keym, datam;
3069  int ret;
3070  String buf;
3071  String key_str;
3072  bool ephemeral;
3073 
3074  HT_DEBUG_OUT << "node_is_ephemeral txn=" << txn << " node_name=" << name << HT_END;
3075 
3076  try {
3077  HT_ASSERT(node_exists(txn, name));
3078 
3079  // Replace existing node_name
3080  key_str = get_node_key(name, NODE_EPHEMERAL);
3081  keym.set_str(key_str);
3082  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
3083 
3084  HT_ASSERT(ret == 0);
3085 
3086  if (!strcmp(datam.get_str(), "0"))
3087  ephemeral = false;
3088  else if (!strcmp(datam.get_str(), "1"))
3089  ephemeral = true;
3090  else
3091  HT_ASSERT(false);
3092  }
3093  catch (DbException &e) {
3094  if (e.get_errno() == DB_LOCK_DEADLOCK)
3096  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3098  HT_ERRORF("Berkeley DB error: %s", e.what());
3100  }
3101 
3102  HT_DEBUG_OUT << "exitting node_is_ephemeral txn=" << txn << " node_name= " << name
3103  << " ephemeral=" << ephemeral << HT_END;
3104  return ephemeral;
3105 }
3106 
3107 /*
3108  *
3109  */
3110 void
3112  uint32_t lock_mode)
3113 {
3114  DbtManaged keym, datam;
3115  char numbuf[16];
3116  int ret;
3117  Dbc *cursorp = 0;
3118  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3119  String key_str;
3120 
3121  HT_DEBUG_OUT <<"set_node_cur_lock_mode txn="<< txn <<" node=" << name
3122  <<" lock_mode=" << lock_mode << HT_END;
3123  try {
3124  HT_ASSERT(node_exists(txn, name));
3125  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3126 
3127  // Replace existing lock_mode
3128  key_str = get_node_key(name, NODE_LOCK_MODE);
3129  keym.set_str(key_str);
3130 
3131  ret = cursorp->get(&keym, &datam, DB_SET);
3132  HT_ASSERT(ret == 0);
3133 
3134  sprintf(numbuf, "%lu", (Lu)lock_mode);
3135  datam.set_str(numbuf);
3136 
3137  ret = cursorp->put(&keym, &datam, DB_CURRENT);
3138  HT_ASSERT(ret==0);
3139  }
3140  catch (DbException &e) {
3141  if (e.get_errno() == DB_LOCK_DEADLOCK)
3143  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3145  HT_ERRORF("Berkeley DB error: %s", e.what());
3147  }
3148 
3149  HT_DEBUG_OUT << "exitting set_node_cur_lock_mode txn="<< txn <<" node=" << name
3150  << " lock_mode=" << lock_mode << HT_END;
3151 }
3152 
3153 /*
3154  *
3155  */
3156 uint32_t
3158 {
3159  DbtManaged keym, datam;
3160  int ret;
3161  Dbc *cursorp = 0;
3162  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3163  String key_str;
3164  uint32_t lock_mode;
3165 
3166  HT_DEBUG_OUT <<"get_node_cur_lock_mode txn="<< txn <<" node=" << name << HT_END;
3167 
3168  try {
3169  HT_ASSERT(node_exists(txn, name));
3170  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3171 
3172  // Get existing lock_mode
3173  key_str = get_node_key(name, NODE_LOCK_MODE);
3174  keym.set_str(key_str);
3175 
3176  ret = cursorp->get(&keym, &datam, DB_SET);
3177  HT_ASSERT(ret == 0);
3178  lock_mode = (uint32_t) strtoull(datam.get_str(), 0, 0);
3179  }
3180  catch (DbException &e) {
3181  if (e.get_errno() == DB_LOCK_DEADLOCK)
3183  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3185  HT_ERRORF("Berkeley DB error: %s", e.what());
3187  }
3188 
3189  HT_DEBUG_OUT << "exitting get_node_cur_lock_mode txn="<< txn <<" node=" << name
3190  << " lock_mode=" << lock_mode << HT_END;
3191  return lock_mode;
3192 }
3193 
3194 /*
3195  *
3196  */
3197 void
3199  const String &name, uint64_t exclusive_lock_handle)
3200 {
3201  DbtManaged keym, datam;
3202  char numbuf[17];
3203  int ret;
3204  Dbc *cursorp = 0;
3205  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3206  String key_str;
3207 
3208  HT_DEBUG_OUT <<"set_node_exclusive_lock_handle txn="<< txn <<" node=" << name
3209  <<" exclusive_lock_handle=" << exclusive_lock_handle << HT_END;
3210  try {
3211  HT_ASSERT(node_exists(txn, name));
3212  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3213 
3214  // Replace existing exclusive lock handle
3215  key_str = get_node_key(name, NODE_EXCLUSIVE_LOCK_HANDLE);
3216  keym.set_str(key_str);
3217 
3218  ret = cursorp->get(&keym, &datam, DB_SET);
3219  HT_ASSERT(ret == 0);
3220 
3221  sprintf(numbuf, "%llu", (Llu)exclusive_lock_handle);
3222  datam.set_str(numbuf);
3223 
3224  ret = cursorp->put(&keym, &datam, DB_CURRENT);
3225  HT_ASSERT(ret==0);
3226  }
3227  catch (DbException &e) {
3228  if (e.get_errno() == DB_LOCK_DEADLOCK)
3230  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3232  HT_ERRORF("Berkeley DB error: %s", e.what());
3234  }
3235  HT_DEBUG_OUT <<"exitting set_node_exclusive_lock_handle txn="<< txn <<" node="<< name
3236  <<" exclusive_lock_handle=" << exclusive_lock_handle << HT_END;
3237 }
3238 
3239 /*
3240  *
3241  */
3242 uint64_t
3244 {
3245  DbtManaged keym, datam;
3246  int ret;
3247  Dbc *cursorp = 0;
3248  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3249  String key_str;
3250  uint64_t exclusive_lock_handle=0;
3251 
3252  HT_DEBUG_OUT <<"get_node_exclusive_lock_handle txn="<< txn <<" node=" << name << HT_END;
3253 
3254  try {
3255  HT_ASSERT(node_exists(txn, name));
3256  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3257 
3258  // Replace existing exclusive lock handle
3259  key_str = get_node_key(name, NODE_EXCLUSIVE_LOCK_HANDLE);
3260  keym.set_str(key_str);
3261 
3262  ret = cursorp->get(&keym, &datam, DB_SET);
3263  HT_ASSERT(ret == 0);
3264  exclusive_lock_handle = (uint64_t)strtoull(datam.get_str(), 0, 0);
3265  }
3266  catch (DbException &e) {
3267  if (e.get_errno() == DB_LOCK_DEADLOCK)
3269  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3271  HT_ERRORF("Berkeley DB error: %s", e.what());
3273  }
3274 
3275  HT_DEBUG_OUT <<"exitting get_node_exclusive_lock_handle txn=" << txn <<" node=" << name
3276  <<" exclusive_lock_handle=" << exclusive_lock_handle << HT_END;
3277 
3278  return exclusive_lock_handle;
3279 }
3280 
3281 /*
3282  *
3283  */
3284 void
3286  uint64_t handle_id)
3287 {
3288  int ret;
3289  DbtManaged keym, datam;
3290  String key_str;
3291  char numbuf[16];
3292  Dbc *cursorp = 0;
3293  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3294 
3295  HT_DEBUG_OUT <<"add_node_handle txn="<< txn <<" node="<< name << " handle id=" << handle_id
3296  << HT_END;
3297  try {
3298  HT_ASSERT(node_exists(txn, name));
3299 
3300  String node_handles_dir = get_node_key(name,
3301  NODE_HANDLE_MAP);
3302  keym.set_str(node_handles_dir);
3303  sprintf(numbuf, "%llu", (Llu)handle_id);
3304  datam.set_str(numbuf);
3305 
3306  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3307  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
3308  HT_ASSERT(ret == 0);
3309  }
3310  catch (DbException &e) {
3311  if (e.get_errno() == DB_LOCK_DEADLOCK)
3313  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3315  HT_ERRORF("Berkeley DB error: %s", e.what());
3317  }
3318  HT_DEBUG_OUT <<"exitting add_node_handle txn="<< txn <<" node="<< name
3319  << " handle id=" << handle_id << HT_END;
3320 }
3321 
3322 /*
3323  *
3324  */
3325 bool
3327  uint32_t event_mask, NotificationMap &handles_to_sessions)
3328 {
3329  int ret;
3330  DbtManaged keym, datam;
3331  String key_str;
3332  uint64_t handle, session;
3333  uint32_t mask;
3334  Dbc *cursorp = 0;
3335  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3336  bool has_notifications = false;
3337 
3338  HT_DEBUG_OUT <<"get_node_event_notification_map txn="<< txn <<" node="<< name << HT_END;
3339 
3340  try {
3341  HT_ASSERT(node_exists(txn, name));
3342 
3343  String node_handles_dir = get_node_key(name, NODE_HANDLE_MAP);
3344  keym.set_str(node_handles_dir);
3345  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3346  ret = cursorp->get(&keym, &datam, DB_SET);
3347 
3348  // iterate through all handles
3349  while (ret != DB_NOTFOUND) {
3350  HT_ASSERT(ret==0);
3351  handle = (uint64_t)strtoull(datam.get_str(), 0, 0);
3352  mask = get_handle_event_mask(txn, handle);
3353  // if this handle was registered for this event then store it in the notification map
3354  if ((mask&event_mask) != 0) {
3355  session = get_handle_session(txn, handle);
3356  handles_to_sessions[handle] = session;
3357  has_notifications = true;
3358  }
3359  ret = cursorp->get(&keym, &datam, DB_NEXT_DUP);
3360  }
3361  }
3362  catch (DbException &e) {
3363  if (e.get_errno() == DB_LOCK_DEADLOCK)
3365  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3367  HT_ERRORF("Berkeley DB error: %s", e.what());
3369  }
3370 
3371  HT_DEBUG_OUT << "exitting get_node_event_notification_map txn=" << txn << " node="<< name
3372  << " has_notifications=" << has_notifications << HT_END;
3373  return has_notifications;
3374 }
3375 
3376 /*
3377  *
3378  */
3379 void
3381  uint64_t handle_id)
3382 {
3383  int ret;
3384  DbtManaged keym, datam;
3385  String key_str;
3386  Dbc *cursorp=0;
3387  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3388  char numbuf[17];
3389 
3390  HT_DEBUG_OUT <<"delete_node_handle txn="<< txn <<" node="<< name
3391  <<" handle_id=" << handle_id << HT_END;
3392  try {
3393  HT_ASSERT(node_exists(txn, name));
3394 
3395  // Delete handle
3396  String node_handles_dir = get_node_key(name, NODE_HANDLE_MAP);
3397  keym.set_str(node_handles_dir);
3398  sprintf(numbuf, "%llu", (Llu)handle_id);
3399  datam.set_str(numbuf);
3400  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3401  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
3402  HT_ASSERT(ret ==0 );
3403 
3404  ret = cursorp->del(0);
3405  HT_ASSERT(ret == 0);
3406  }
3407  catch (DbException &e) {
3408  if (e.get_errno() == DB_LOCK_DEADLOCK)
3410  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3412  HT_ERRORF("Berkeley DB error: %s", e.what());
3414  }
3415  HT_DEBUG_OUT <<"exitting delete_node_handle txn="<< txn <<" node="<< name
3416  <<" handle_id=" << handle_id << HT_END;
3417 }
3418 
3419 
3420 
3421 /*
3422  *
3423  */
3424 void
3426  const String &name,
3427  LockRequest &request) {
3428  int ret;
3429  DbtManaged keym, datam;
3430  String key_str;
3431  char numbuf[17];
3432  Dbc *cursorp = 0;
3433  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3434 
3435  HT_DEBUG_OUT <<"add_node_pending_lock_request txn="<< txn <<" node=" << name
3436  << " handle id=" << request.handle << " mode=" << request.mode << HT_END;
3437  try {
3438  HT_ASSERT(node_exists(txn, name));
3439 
3440  String node_pending_locks_dir = get_node_key(name, NODE_PENDING_LOCK_REQUESTS);
3441  keym.set_str(node_pending_locks_dir);
3442  sprintf(numbuf, "%llu", (Llu)request.handle);
3443  datam.set_str(numbuf);
3444 
3445  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3446  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
3447  HT_ASSERT(ret == 0);
3448 
3449  key_str = get_node_pending_lock_request_key(name, request.handle);
3450  keym.set_str(key_str);
3451  sprintf(numbuf, "%lu", (Lu)request.mode);
3452  datam.set_str(numbuf);
3453  ret = txn.handle_state_db->put(txn.db_txn, &keym, &datam, 0);
3454  HT_ASSERT(ret == 0);
3455  }
3456  catch (DbException &e) {
3457  if (e.get_errno() == DB_LOCK_DEADLOCK)
3459  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3461  HT_ERRORF("Berkeley DB error: %s", e.what());
3463  }
3464 
3465  HT_DEBUG_OUT <<" exitting add_node_pending_lock_request txn="<< txn << " node=" << name
3466  <<" handle id=" << request.handle << " mode=" << request.mode << HT_END;
3467 }
3468 
3469 /*
3470  *
3471  */
3472 
3473 bool
3475 {
3476  int ret;
3477  DbtManaged keym, datam;
3478  String key_str;
3479  bool has_pending_lock_request = false;
3480  Dbc *cursorp = 0;
3481  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3482  uint64_t handle_id;
3483 
3484  HT_DEBUG_OUT << "node_has_pending_lock_request txn=" << txn << " node=" << name << HT_END;
3485 
3486  try {
3487  HT_ASSERT(node_exists(txn, name));
3488 
3489  // get top pending handle id of lock request (if any)
3490  String node_pending_locks_dir = get_node_key(name, NODE_PENDING_LOCK_REQUESTS);
3491  keym.set_str(node_pending_locks_dir);
3492  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3493  ret = cursorp->get(&keym, &datam, DB_SET);
3494 
3495  HT_ASSERT(ret == 0 || ret == DB_NOTFOUND);
3496 
3497  if (ret == 0) {
3498  // get lock request mode
3499  handle_id = (uint64_t) strtoull(datam.get_str(), 0, 0);
3500  key_str = get_node_pending_lock_request_key(name, handle_id);
3501  keym.set_str(key_str);
3502  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
3503  HT_ASSERT(ret == 0);
3504  has_pending_lock_request = true;
3505  }
3506  }
3507  catch (DbException &e) {
3508  if (e.get_errno() == DB_LOCK_DEADLOCK)
3510  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3512  HT_ERRORF("Berkeley DB error: %s", e.what());
3514  }
3515 
3516  HT_DEBUG_OUT <<" exitting node_has_pending_lock_request txn="<< txn << " node=" << name
3517  <<" node_has_pending_lock_request=" << has_pending_lock_request << HT_END;
3518 
3519  return has_pending_lock_request;
3520 }
3521 
3522 /*
3523  *
3524  */
3525 
3526 bool
3528  LockRequest &front_req)
3529 {
3530  int ret;
3531  DbtManaged keym, datam;
3532  String key_str;
3533  bool has_pending_lock_request = false;
3534  Dbc *cursorp = 0;
3535  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3536  uint64_t handle_id;
3537 
3538  HT_DEBUG_OUT << "get_node_pending_lock_request txn=" << txn << " node=" << name << HT_END;
3539 
3540  try {
3541  HT_ASSERT(node_exists(txn, name));
3542 
3543  // get top pending handle id of lock request (if any)
3544  String node_pending_locks_dir = get_node_key(name, NODE_PENDING_LOCK_REQUESTS);
3545  keym.set_str(node_pending_locks_dir);
3546  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3547  ret = cursorp->get(&keym, &datam, DB_SET);
3548  HT_ASSERT(ret == 0 || ret == DB_NOTFOUND);
3549 
3550  if (ret == 0) {
3551  // get lock request mode
3552  has_pending_lock_request = true;
3553  handle_id = (uint64_t) strtoull(datam.get_str(), 0, 0);
3554  key_str = get_node_pending_lock_request_key(name, handle_id);
3555  keym.set_str(key_str);
3556  ret = txn.handle_state_db->get(txn.db_txn, &keym, &datam, 0);
3557  HT_ASSERT(ret == 0);
3558 
3559  front_req.handle = handle_id;
3560  front_req.mode = (uint32_t) strtoull(datam.get_str(), 0, 0);
3561  }
3562  }
3563  catch (DbException &e) {
3564  if (e.get_errno() == DB_LOCK_DEADLOCK)
3566  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3568  HT_ERRORF("Berkeley DB error: %s", e.what());
3570  }
3571 
3572  HT_DEBUG_OUT <<" exitting get_node_pending_lock_request txn="<< txn << " node=" << name
3573  <<" has_pending_lock_request=" << has_pending_lock_request << HT_END;
3574 
3575  return has_pending_lock_request;
3576 }
3577 /*
3578  *
3579  */
3580 void
3582  const String &name, uint64_t handle_id)
3583 {
3584  int ret;
3585  DbtManaged keym, datam;
3586  String key_str;
3587  char numbuf[16];
3588  Dbc *cursorp = 0;
3589  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3590 
3591  HT_DEBUG_OUT <<"remove_node_pending_lock_request txn="<< txn <<" node=" << name
3592  <<" handle id=" << handle_id << HT_END;
3593  try {
3594  HT_ASSERT(node_exists(txn, name));
3595 
3596  // Delete node pending lock req
3597  String node_pending_locks_dir = get_node_key(name,
3599  keym.set_str(node_pending_locks_dir);
3600  sprintf(numbuf, "%llu", (Llu)handle_id);
3601  datam.set_str(numbuf);
3602  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3603  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
3604  HT_ASSERT(ret ==0);
3605 
3606  ret = cursorp->del(0);
3607  HT_ASSERT(ret == 0);
3608  //Delete pending lock request
3609  key_str = get_node_pending_lock_request_key(name, handle_id);
3610  keym.set_str(key_str);
3611  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
3612  HT_ASSERT(ret==0);
3613  }
3614  catch (DbException &e) {
3615  if (e.get_errno() == DB_LOCK_DEADLOCK)
3617  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3619  HT_ERRORF("Berkeley DB error: %s", e.what());
3621  }
3622 
3623  HT_DEBUG_OUT <<"exitting remove_node_pending_lock_request txn="<< txn
3624  <<" node=" << name << " handle id=" << handle_id << HT_END;
3625 }
3626 
3627 /*
3628  *
3629  */
3630 void
3632  uint64_t handle_id)
3633 {
3634  int ret;
3635  DbtManaged keym, datam;
3636  String key_str;
3637  char numbuf[16];
3638  Dbc *cursorp = 0;
3639  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3640 
3641  HT_DEBUG_OUT <<"add_node_shared_lock_handle txn="<< txn <<" node="<< name
3642  << " handle id=" << handle_id << HT_END;
3643  try {
3644  HT_ASSERT(node_exists(txn, name));
3645 
3646  String node_shared_handles_dir = get_node_key(name,
3648  keym.set_str(node_shared_handles_dir);
3649  sprintf(numbuf, "%llu", (Llu)handle_id);
3650  datam.set_str(numbuf);
3651 
3652  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3653  ret = cursorp->put(&keym, &datam, DB_KEYLAST);
3654  HT_ASSERT(ret == 0);
3655  }
3656  catch (DbException &e) {
3657  if (e.get_errno() == DB_LOCK_DEADLOCK)
3659  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3661  HT_ERRORF("Berkeley DB error: %s", e.what());
3663  }
3664  HT_DEBUG_OUT <<"exitting add_node_shared_lock_handle txn="<< txn <<" node="<< name
3665  << " handle id=" << handle_id << HT_END;
3666 }
3667 
3668 /*
3669  *
3670  */
3671 bool
3673 {
3674  int ret;
3675  DbtManaged keym, datam;
3676  String key_str;
3677  Dbc *cursorp = 0;
3678  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3679  bool has_shared_lock_handles=false;
3680 
3681  HT_DEBUG_OUT <<"node_has_shared_lock_handles txn="<< txn <<" node="<< name << HT_END;
3682 
3683  try {
3684  HT_ASSERT(node_exists(txn, name));
3685 
3686  String node_shared_handles_dir = get_node_key(name, NODE_SHARED_LOCK_HANDLES);
3687  keym.set_str(node_shared_handles_dir);
3688  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3689  ret = cursorp->get(&keym, &datam, DB_SET);
3690  HT_ASSERT(ret == 0 || ret == DB_NOTFOUND);
3691 
3692  if (ret != DB_NOTFOUND)
3693  has_shared_lock_handles = true;
3694  }
3695  catch (DbException &e) {
3696  if (e.get_errno() == DB_LOCK_DEADLOCK)
3698  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3700  HT_ERRORF("Berkeley DB error: %s", e.what());
3702  }
3703 
3704  HT_DEBUG_OUT <<"exitting node_has_shared_lock_handle txn="<< txn <<" node="<< name
3705  <<" has_shared_lock_handles=" << has_shared_lock_handles << HT_END;
3706  return has_shared_lock_handles;
3707 }
3708 
3709 /*
3710  *
3711  */
3712 void
3714  uint64_t handle_id)
3715 {
3716  int ret;
3717  DbtManaged keym, datam;
3718  String key_str;
3719  Dbc *cursorp=0;
3720  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3721  char numbuf[17];
3722 
3723  HT_DEBUG_OUT <<"remove_node_shared_lock_handle txn="<< txn <<" node="<< name
3724  <<" handle_id=" << handle_id << HT_END;
3725  try {
3726  HT_ASSERT(node_exists(txn, name));
3727 
3728  // Delete shared lock handle
3729  String node_shared_handles_dir = get_node_key(name, NODE_SHARED_LOCK_HANDLES );
3730  keym.set_str(node_shared_handles_dir);
3731  sprintf(numbuf, "%llu", (Llu)handle_id);
3732  datam.set_str(numbuf);
3733  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3734  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
3735  HT_ASSERT(ret ==0);
3736 
3737  ret = cursorp->del(0);
3738  HT_ASSERT(ret == 0);
3739  }
3740  catch (DbException &e) {
3741  if (e.get_errno() == DB_LOCK_DEADLOCK)
3743  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3745  HT_ERRORF("Berkeley DB error: %s", e.what());
3747  }
3748  HT_DEBUG_OUT <<"exitting remove_node_shared_lock_handle txn="<< txn <<" node="<< name
3749  <<" handle_id=" << handle_id << HT_END;
3750 }
3751 
3752 /*
3753  *
3754  */
3755 bool
3757 {
3758  int ret;
3759  DbtManaged keym, datam;
3760  String key_str;
3761  Dbc *cursorp = 0;
3762  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3763 
3764  HT_DEBUG_OUT <<"delete_node txn="<< txn <<" node ="<< name << HT_END;
3765  try {
3766  if (!node_exists(txn, name))
3767  return false;
3768  // Delete name under "/NODES/"
3769  String nodes_dir = NODES_STR;
3770  keym.set_str(nodes_dir);
3771  datam.set_str(name);
3772 
3773  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3774  ret = cursorp->get(&keym, &datam, DB_GET_BOTH);
3775  HT_ASSERT(ret==0);
3776  ret = cursorp->del(0);
3777  HT_ASSERT(ret==0);
3778 
3779  // Delete node ephemeral bool
3780  key_str = get_node_key(name, NODE_EPHEMERAL);
3781  keym.set_str(key_str);
3782 
3783  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
3784  HT_ASSERT(ret == 0);
3785 
3786  // Delete node cur_lock_mode
3787  key_str = get_node_key(name, NODE_LOCK_MODE);
3788  keym.set_str(key_str);
3789 
3790  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
3791  HT_ASSERT(ret == 0);
3792 
3793  // Delete node lock_generation
3794  key_str = get_node_key(name, NODE_LOCK_GENERATION);
3795  keym.set_str(key_str);
3796 
3797  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
3798  HT_ASSERT(ret == 0);
3799 
3800  // Delete node exclusive_lock_handle
3801  key_str = get_node_key(name, NODE_EXCLUSIVE_LOCK_HANDLE);
3802  keym.set_str(key_str);
3803 
3804  ret = txn.handle_state_db->del(txn.db_txn, &keym, 0);
3805  HT_ASSERT(ret == 0);
3806  }
3807  catch (DbException &e) {
3808  if (e.get_errno() == DB_LOCK_DEADLOCK)
3810  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3812  HT_ERRORF("Berkeley DB error: %s", e.what());
3814  }
3815  HT_DEBUG_OUT << "exitting delete_node txn="<< txn <<" node="
3816  << name << HT_END;
3817  return true;
3818 }
3819 
3820 
3821 /*
3822  *
3823  */
3824 bool
3826 {
3827  DbtManaged keym, datam;
3828  Dbc *cursorp = 0;
3829  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3830  bool exists = true;
3831 
3832  HT_DEBUG_OUT <<"node_exists txn="<< txn << " node name="
3833  << name << HT_END;
3834 
3835  try {
3836  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3837 
3838  // Check for id under "/NODES/"
3839  String nodes_dir = NODES_STR;
3840  keym.set_str(nodes_dir);
3841  datam.set_str(name);
3842 
3843  if(cursorp->get(&keym, &datam, DB_GET_BOTH) == DB_NOTFOUND) {
3844  exists = false;
3845  }
3846  }
3847  catch (DbException &e) {
3848  if (e.get_errno() == DB_LOCK_DEADLOCK)
3850  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3852  HT_ERRORF("Berkeley DB error: %s", e.what());
3854  }
3855 
3856  HT_DEBUG_OUT <<"exitting node_exists txn="<< txn << " node name =" << name
3857  <<" exists=" << exists << HT_END;
3858  return exists;
3859 }
3860 
3861 /*
3862  *
3863  */
3864 void
3866  std::vector<uint64_t> &handles)
3867 {
3868  int ret;
3869  DbtManaged keym, datam;
3870  String key_str;
3871  Dbc *cursorp = 0;
3872  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3873 
3874  HT_DEBUG_OUT << "get_node_handles txn=" << txn << " node=" << name << HT_END;
3875 
3876  try {
3877  HT_ASSERT(node_exists(txn, name));
3878 
3879  // Iterate through all handles
3880  String node_handles_dir = get_node_key(name, NODE_HANDLE_MAP);
3881  keym.set_str(node_handles_dir);
3882  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3883  ret = cursorp->get(&keym, &datam, DB_SET);
3884 
3885  while (ret != DB_NOTFOUND) {
3886  HT_ASSERT(ret==0);
3887  handles.push_back((uint64_t)strtoull(datam.get_str(), 0, 0));
3888  ret = cursorp->get(&keym, &datam, DB_NEXT_DUP);
3889  }
3890  }
3891  catch (DbException &e) {
3892  if (e.get_errno() == DB_LOCK_DEADLOCK)
3894  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3896  HT_ERRORF("Berkeley DB error: %s", e.what());
3898  }
3899 
3900  HT_DEBUG_OUT << "get_node_handles txn=" << txn << " node=" << name << HT_END;
3901 }
3902 
3903 /*
3904  *
3905  */
3906 bool
3908 {
3909  int ret;
3910  DbtManaged keym, datam;
3911  String key_str;
3912  Dbc *cursorp=0;
3913  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3914  bool has_open_handles = false;
3915  uint64_t open_handle;
3916 
3917  HT_DEBUG_OUT << "node_has_open_handles txn=" << txn << " node=" << name << HT_END;
3918 
3919  try {
3920  HT_ASSERT(node_exists(txn, name));
3921 
3922  // Check to see if there is even one handle open to this node
3923  String node_handles_dir = get_node_key(name, NODE_HANDLE_MAP);
3924  keym.set_str(node_handles_dir);
3925  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3926  ret = cursorp->get(&keym, &datam, DB_SET);
3927 
3928  HT_ASSERT(ret == 0 || ret == DB_NOTFOUND);
3929  if (ret == 0) {
3930  open_handle = (uint64_t) strtoull((const char *)datam.get_data(), 0, 0);
3931  HT_DEBUG_OUT << "node_has_open_handles txn=" << txn << " node=" << name
3932  << " at least one open_handle=" << open_handle << HT_END;
3933  has_open_handles = true;
3934  }
3935  }
3936  catch (DbException &e) {
3937  if (e.get_errno() == DB_LOCK_DEADLOCK)
3939  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
3941  HT_ERRORF("Berkeley DB error: %s", e.what());
3943  }
3944 
3945  HT_DEBUG_OUT << "node_has_open_handles txn=" << txn << " node=" << name
3946  << " has_open_handles=" << has_open_handles << HT_END;
3947  return has_open_handles;
3948 }
3949 
3950 /*
3951  *
3952  */
3953 uint64_t
3955 {
3956  DbtManaged keym, datam;
3957  int ret;
3958  uint64_t retval=0;
3959  Dbc *cursorp = 0;
3960  HT_ON_SCOPE_EXIT(&close_db_cursor, &cursorp);
3961  char numbuf[17];
3962 
3963  HT_DEBUG_OUT <<"get_next_id_i64 txn="<< txn << " id_type=" << id_type << " increment="
3964  << increment << HT_END;
3965  try {
3966  txn.handle_state_db->cursor(txn.db_txn, &cursorp, 0);
3967 
3968  // Get next id
3969  switch (id_type) {
3970  case SESSION:
3971  keym.set_str(NEXT_SESSION_ID);
3972  break;
3973  case HANDLE:
3974  keym.set_str(NEXT_HANDLE_ID);
3975  break;
3976  case EVENT:
3977  keym.set_str(NEXT_EVENT_ID);
3978  break;
3979  default:
3980  HT_THROWF(HYPERSPACE_STATEDB_BAD_KEY, "Unknown i64 id type:%d",id_type);
3981  }
3982 
3983  ret = cursorp->get(&keym, &datam, DB_SET);
3984 
3985  HT_ASSERT(ret == 0);
3986 
3987  retval = strtoull((const char *)datam.get_data(), 0, 0);
3988  if (increment) {
3989  sprintf(numbuf, "%llu", (Llu)retval+1);
3990  datam.set_str(numbuf);
3991  ret = cursorp->put(&keym, &datam, DB_CURRENT);
3992  HT_ASSERT(ret==0);
3993  }
3994  }
3995  catch (DbException &e) {
3996  if (e.get_errno() == DB_LOCK_DEADLOCK)
3998  else if (e.get_errno() == DB_REP_HANDLE_DEAD)
4000  HT_ERRORF("Berkeley DB error: %s", e.what());
4002  }
4003 
4004  HT_DEBUG_OUT <<"get_next_id_i64 txn="<< txn << " id_type=" << id_type << " increment="
4005  << increment << " retval=" << retval << HT_END;
4006  return retval;
4007 }
4008 
4009 std::ostream& Hyperspace::operator<<(std::ostream &out, const BDbTxn &txn) {
4010  out << "{BDbTxn m_handle_namespace_db=" << txn.handle_namespace_db
4011  << ", m_handle_state_db=" << txn.handle_state_db
4012  << ", m_db_txn=" << txn.db_txn << "}";
4013  return out;
4014 }
Session identifier.
uint32_t get_handle_del_state(BDbTxn &txn, uint64_t id)
Retrieves system information (hardware, installation directory, etc)
uint64_t handle
Node handle ID.
void set_session_name(BDbTxn &txn, uint64_t id, const String &name)
const String NEXT_SESSION_ID
Definition: StateDbKeys.h:131
void set_handle_open_flags(BDbTxn &txn, uint64_t id, uint32_t open_flags)
void add_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &request)
Adds a lock request.
const String NEXT_EVENT_ID
Definition: StateDbKeys.h:132
bool node_has_shared_lock_handles(BDbTxn &txn, const String &name)
std::chrono::steady_clock::duration m_log_gc_interval
bool is_dir
Definition: DirEntryAttr.h:66
static void db_msg_callback(const DbEnv *dbenv, const char *msg)
String get_event_key(uint64_t id, uint32_t type)
Definition: StateDbKeys.cc:36
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
Compatibility class for boost::filesystem::path.
static void db_err_callback(const DbEnv *dbenv, const char *errpfx, const char *msg)
void set_str(const std::string &str)
Definition: DbtManaged.h:41
void delete_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle_id)
bool get_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t *valuep)
static Endpoint parse_endpoint(const char *endpoint, int defport=0)
Parse an endpoint string in (host:port) format.
Definition: InetAddr.cc:181
void finish_election()
Finish master election.
Db * handle_namespace_db
Filesystem namespace database handle.
StaticBuffer attr
Definition: DirEntryAttr.h:63
void get_handle_node(BDbTxn &txn, uint64_t id, String &node_name)
void add_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
Manages transaction state.
bool incr_attr(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
String get_session_key(uint64_t id, uint32_t type)
Definition: StateDbKeys.cc:64
void create_handle(BDbTxn &txn, uint64_t id, String node_name, uint32_t open_flags, uint32_t event_mask, uint64_t session_id, bool locked, uint32_t del_state)
void create_node(BDbTxn &txn, const String &name, bool ephemeral=false, uint64_t lock_generation=0, uint32_t cur_lock_mode=0, uint64_t exclusive_handle=0)
const String EVENTS_STR
Definition: StateDbKeys.h:122
long long unsigned int Llu
Shortcut for printf formats.
Definition: String.h:50
Po::typed_value< String > * str(String *v=0)
Definition: Properties.h:166
void create(BDbTxn &txn, const String &fname, bool temp)
const String SESSIONS_STR
Definition: StateDbKeys.h:100
std::shared_ptr< BDbHandles > BDbHandlesPtr
Smart pointer to BDbHandles.
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
Declarations for BerkeleyDbFilesystem.
#define HT_INFO(msg)
Definition: Logger.h:271
#define HT_FATAL(msg)
Definition: Logger.h:339
void delete_session(BDbTxn &txn, uint64_t id)
bool node_is_ephemeral(BDbTxn &txn, const String &name)
void set_node_cur_lock_mode(BDbTxn &txn, const String &name, uint32_t lock_mode)
#define HT_ON_SCOPE_EXIT(...)
Definition: ScopeGuard.h:301
bool node_has_pending_lock_request(BDbTxn &txn, const String &name)
Encapsulates replication state.
uint8_t * ptr
Pointer to the end of the used part of the buffer.
void add_node_shared_lock_handle(BDbTxn &txn, const String &name, uint64_t handle)
A dynamic, resizable and reference counted memory buffer.
Definition: DynamicBuffer.h:42
const String NODES_STR
Definition: StateDbKeys.h:109
void set_handle_event_mask(BDbTxn &txn, uint64_t id, uint32_t event_mask)
void get_session_handles(BDbTxn &txn, uint64_t id, std::vector< uint64_t > &handles)
Hyperspace definitions
#define HT_EXPECT(_e_, _code_)
Definition: Logger.h:388
#define HT_ASSERT(_e_)
Definition: Logger.h:396
bool election_finished()
Check if master election is finished.
Definition: DirEntryAttr.h:40
bool exists(BDbTxn &txn, String fname, bool *is_dir_p=0)
Compatibility class for boost::filesystem::path.
Definition: Path.h:45
void get_all_names(BDbTxn &txn, std::vector< String > &names)
std::unordered_map< uint64_t, uint64_t > NotificationMap
Hash map from Node handle ID to Session ID.
std::string name
Definition: DirEntryAttr.h:62
bool node_has_open_handles(BDbTxn &txn, const String &name)
void set_node_ephemeral(BDbTxn &txn, const String &name, bool ephemeral)
bool has_attr
Boolean value indicating whether or not this entry is a directory.
Definition: DirEntryAttr.h:65
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
void do_checkpoint()
Checkpoints the BerkeleyDB database.
#define HT_DEBUG_ATTR_(_txn_, _fn_, _an_, _k_, _v_, _l_)
bool handle_is_locked(BDbTxn &txn, uint64_t id)
bool delete_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
static bool is_ipv4(const char *ip)
Tests whether the input string in n.n.n.n format (base 10)
Definition: InetAddr.cc:113
bool delete_node(BDbTxn &txn, const String &name)
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
bool get_node_event_notification_map(BDbTxn &txn, const String &name, uint32_t event_mask, NotificationMap &handles_to_sessions)
Logging routines and macros.
bool get_node_pending_lock_request(BDbTxn &txn, const String &name, LockRequest &front_req)
Check if a node has any pending lock requests from non-expired handles.
static void db_event_callback(DbEnv *dbenv, uint32_t which, void *info)
void encode_i32(uint8_t **bufp, uint32_t val)
Encode a 32-bit integer in little-endian order.
uint64_t get_node_exclusive_lock_handle(BDbTxn &txn, const String &name)
void start_transaction(BDbTxn &txn)
Creates a new BerkeleyDB transaction.
void set_handle_locked(BDbTxn &txn, uint64_t id, bool locked)
Compatibility Macros for C/C++.
bool get_xattr(BDbTxn &txn, const String &fname, const String &aname, Hypertable::DynamicBuffer &vbuf)
void delete_node_handle(BDbTxn &txn, const String &name, uint64_t handle)
void set_handle_del_state(BDbTxn &txn, uint64_t id, uint32_t del_state)
void encode_i64(uint8_t **bufp, uint64_t val)
Encode a 64-bit integer in little-endian order.
#define HT_END
Definition: Logger.h:220
uint32_t get_handle_event_mask(BDbTxn &txn, uint64_t id)
Functions to serialize/deserialize primitives to/from a memory buffer.
Db * handle_state_db
Transient state database handle.
std::unordered_map< int, String > replica_map
void free()
Clears the data; if this object is owner of the data then the allocated buffer is delete[]d...
Definition: StaticBuffer.h:185
Time related declarations.
void init_db_handles(const std::vector< Thread::id > &thread_ids)
DbTxn * db_txn
BerkeleyDB transaction object.
Definition: DirEntry.h:34
bool exists_xattr(BDbTxn &txn, const String &fname, const String &aname)
void create_event(BDbTxn &txn, uint32_t type, uint64_t id, uint32_t mask)
void mkdir(BDbTxn &txn, const String &name)
void add_session_handle(BDbTxn &txn, uint64_t id, uint64_t handle_id)
uint32_t get_node_cur_lock_mode(BDbTxn &txn, const String &name)
Hypertable definitions
Handle identifier.
bool is_master()
Check if we're the current master.
const String NEXT_HANDLE_ID
Definition: StateDbKeys.h:133
#define HT_FATALF(msg,...)
Definition: Logger.h:343
bool event_exists(BDbTxn &txn, uint64_t id)
void set_xattr(BDbTxn &txn, const String &fname, const String &aname, const void *value, size_t value_len)
std::ostream & operator<<(std::ostream &out, const BDbTxn &txn)
Writes human-readable version of txn to an ostream.
void expire_session(BDbTxn &txn, uint64_t id)
bool handle_exists(BDbTxn &txn, uint64_t id)
void delete_node_pending_lock_request(BDbTxn &txn, const String &name, uint64_t handle)
void delete_event(BDbTxn &txn, uint64_t id)
uint32_t m_checkpoint_size_kb
Checkpoint size threshold in kilobytes.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
void set_node_lock_generation(BDbTxn &txn, const String &name, uint64_t lock_generation)
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
uint8_t * base
Pointer to the allocated memory buffer.
uint64_t get_handle_session(BDbTxn &txn, uint64_t id)
void set_xattr_i32(BDbTxn &txn, const String &fname, const String &aname, uint32_t value)
bool is_dir
Boolean value indicating whether or not this entry is a directory.
Definition: DirEntry.h:38
uint64_t get_next_id_i64(BDbTxn &txn, IdentifierType id_type, bool increment=false)
Encapsulates a lock request for a file node.
#define HT_DEBUG_ATTR(_txn_, _fn_, _an_, _k_, _v_)
bool list_xattr(BDbTxn &txn, const String &fname, std::vector< String > &anames)
const String HANDLES_STR
Definition: StateDbKeys.h:90
uint32_t mode
Lock mode.
bool session_exists(BDbTxn &txn, uint64_t id)
A String class based on std::string.
long unsigned int Lu
Shortcut for printf formats.
Definition: String.h:47
void set_event_notification_handles(BDbTxn &txn, uint64_t id, const std::vector< uint64_t > &handles)
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void create_session(BDbTxn &txn, uint64_t id, const String &addr)
String get_session_name(BDbTxn &txn, uint64_t id)
Event identifier.
bool get_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t *valuep)
void close_db_cursor(Dbc **cursor)
String get_handle_key(uint64_t id, uint32_t type)
Definition: StateDbKeys.cc:86
void unlink(BDbTxn &txn, const String &name)
std::chrono::steady_clock::time_point m_last_log_gc_time
void delete_handle(BDbTxn &txn, uint64_t id)
void del_xattr(BDbTxn &txn, const String &fname, const String &aname)
const char * get_str()
Definition: DbtManaged.h:50
bool node_exists(BDbTxn &txn, const String &name)
System information and statistics based on libsigar.
String get_node_pending_lock_request_key(const String &name, uint64_t handle_id)
Definition: StateDbKeys.cc:152
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
std::string name
Directory entry name.
Definition: DirEntry.h:36
static const NetInfo & net_info()
Retrieves updated Network information (see SystemInfo.h)
Definition: SystemInfo.cc:360
#define HT_FATAL_OUT
Definition: Logger.h:347
void get_directory_listing(BDbTxn &txn, String fname, std::vector< DirEntry > &listing)
uint32_t get_handle_open_flags(BDbTxn &txn, uint64_t id)
uint8_t * add_unchecked(const void *data, size_t len)
Adds additional data without boundary checks.
IdentifierType
Enumeration for object identifier types.
void get_node_handles(BDbTxn &txn, const String &name, std::vector< uint64_t > &handles)
void wait_for_election()
Waits for master election to finish.
#define HT_DEBUG_OUT
Definition: Logger.h:261
uint64_t incr_node_lock_generation(BDbTxn &txn, const String &name)
void get_directory_attr_listing(BDbTxn &txn, String fname, const String &aname, bool include_sub_entries, std::vector< DirEntryAttr > &listing)
String get_node_key(const String &name, uint32_t type)
Definition: StateDbKeys.cc:114
void set_xattr_i64(BDbTxn &txn, const String &fname, const String &aname, uint64_t value)
void build_attr_key(BDbTxn &, String &keystr, const String &aname, Dbt &key)
void reserve(size_t len, bool nocopy=false)
Reserve space for additional data Will grow the space to exactly what's needed.
Definition: DynamicBuffer.h:95
Executes user-defined functions when leaving the current scope.
High-level entry point to a service; wraps a host:port pair.
Definition: InetAddr.h:44
void set_node_exclusive_lock_handle(BDbTxn &txn, const String &name, uint64_t exclusive_lock_handle)