0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Event.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #ifndef Hyperspace_Event_h
23 #define Hyperspace_Event_h
24 
25 #include <Common/Compat.h>
26 
27 #include "HandleCallback.h"
28 #include "BerkeleyDbFilesystem.h"
29 
30 #include <AsyncComm/CommBuf.h>
31 
32 #include <Common/Random.h>
33 #include <Common/Serialization.h>
34 #include <Common/System.h>
35 
36 #include <condition_variable>
37 #include <chrono>
38 #include <iostream>
39 #include <memory>
40 #include <mutex>
41 #include <string>
42 #include <thread>
43 
44 #define HT_BDBTXN_EVT_BEGIN(parent_txn) \
45  do { \
46  BDbTxn txn; \
47  ms_bdb_fs->start_transaction(txn); \
48  try
49 
50 #define HT_BDBTXN_EVT_END_CB(_cb_) \
51  catch (Exception &e) { \
52  if (e.code() != Error::HYPERSPACE_BERKELEYDB_DEADLOCK) { \
53  if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
54  HT_ERROR_OUT << e << HT_END; \
55  else \
56  HT_WARNF("%s - %s", Error::get_text(e.code()), e.what()); \
57  txn.abort(); \
58  _cb_->error(e.code(), e.what()); \
59  return; \
60  } \
61  HT_WARN_OUT << "Berkeley DB deadlock encountered in txn "<< txn << HT_END; \
62  txn.abort(); \
63  std::this_thread::sleep_for(Random::duration_millis(3000)); \
64  continue; \
65  } \
66  break; \
67  } while (true)
68 
69 #define HT_BDBTXN_EVT_END(...) \
70  catch (Exception &e) { \
71  if (e.code() != Error::HYPERSPACE_BERKELEYDB_DEADLOCK) { \
72  if (e.code() == Error::HYPERSPACE_BERKELEYDB_ERROR) \
73  HT_ERROR_OUT << e << HT_END; \
74  else \
75  HT_WARNF("%s - %s", Error::get_text(e.code()), e.what()); \
76  txn.abort(); \
77  return __VA_ARGS__; \
78  } \
79  HT_WARN_OUT << "Berkeley DB deadlock encountered in txn "<< txn << HT_END; \
80  txn.abort(); \
81  std::this_thread::sleep_for(Random::duration_millis(3000)); \
82  continue; \
83  } \
84  break; \
85  } while (true)
86 
87 namespace Hyperspace {
88  using namespace Hypertable;
89  enum {
94  };
95 
96  class Event {
97  public:
98  Event(uint64_t id, uint32_t mask) : m_id(id), m_mask(mask) { }
99  virtual ~Event() { }
100 
101  uint64_t get_id() { return m_id; }
102 
103  uint32_t get_mask() { return m_mask; }
104 
106  std::lock_guard<std::mutex> lock(m_mutex);
107  m_notification_count++;
108  }
109 
111  std::lock_guard<std::mutex> lock(m_mutex);
112  m_notification_count--;
113  if (m_notification_count == 0) {
114  // all notifications received, so delete event from BDB
116  ms_bdb_fs->delete_event(txn, m_id);
117  txn.commit();
118  }
119  HT_BDBTXN_EVT_END(BOOST_PP_EMPTY());
120  m_cond.notify_all();
121  }
122  }
123 
125  std::unique_lock<std::mutex> lock(m_mutex);
126  m_cond.wait(lock, [this](){ return m_notification_count == 0;});
127  }
128 
129  virtual uint32_t encoded_length() = 0;
130  virtual void encode(Hypertable::CommBuf *cbuf) = 0;
131 
132  static void set_bdb_fs(BerkeleyDbFilesystem *bdb_fs) {
133  ms_bdb_fs = bdb_fs;
134  }
135 
136  protected:
139  std::condition_variable m_cond;
140  uint64_t m_id {};
141  uint32_t m_mask {};
142  uint32_t m_notification_count {};
143  };
144 
145  typedef std::shared_ptr<Event> HyperspaceEventPtr;
146 
147  /*
148  * EventNamed class. Encapsulates named events (e.g. ATTR_SET,
149  * ATTR_DEL, CHILD_NODE_ADDED, CHILD_NODE_REMOVED)
150  */
151  class EventNamed : public Event {
152  public:
153  EventNamed(uint64_t id, uint32_t mask, const std::string &name)
154  : Event(id, mask), m_name(name) { return; }
155 
156  virtual uint32_t encoded_length() {
158  }
159 
160  virtual void encode(Hypertable::CommBuf *cbuf) {
161  cbuf->append_i64(m_id);
162  cbuf->append_i32(m_mask);
163  cbuf->append_vstr(m_name);
164  }
165 
166  private:
167  std::string m_name;
168  };
169 
170 
171  /*
172  * EventLockAcquired class. Used to notify handles when a lock is acquired
173  * on the node it points to.
174  */
175  class EventLockAcquired : public Event {
176  public:
177  EventLockAcquired(uint64_t id, uint32_t mode)
178  : Event(id, EVENT_MASK_LOCK_ACQUIRED), m_mode(mode) { return; }
179 
180  virtual uint32_t encoded_length() { return 16; }
181  virtual void encode(Hypertable::CommBuf *cbuf) {
182  cbuf->append_i64(m_id);
183  cbuf->append_i32(m_mask);
184  cbuf->append_i32(m_mode);
185  }
186  private:
187  uint32_t m_mode;
188  };
189 
190  /*
191  * EventLockReleased class. Used to notify handles when a lock is released
192  * on the node it points to.
193  */
194  class EventLockReleased : public Event {
195  public:
196  EventLockReleased(uint64_t id) : Event(id, EVENT_MASK_LOCK_RELEASED) { return; }
197  virtual uint32_t encoded_length() { return 12; }
198  virtual void encode(Hypertable::CommBuf *cbuf) {
199  cbuf->append_i64(m_id);
200  cbuf->append_i32(m_mask);
201  }
202  };
203 
204  /*
205  * EventLockGranted class. Used to notify handles that a prior lock request
206  * been granted.
207  */
208  class EventLockGranted : public Event {
209  public:
210  EventLockGranted(uint64_t id, uint32_t mode, uint64_t generation)
211  : Event(id, EVENT_MASK_LOCK_GRANTED), m_mode(mode), m_generation(generation)
212  { }
213 
214  virtual uint32_t encoded_length() { return 24; }
215  virtual void encode(Hypertable::CommBuf *cbuf) {
216  cbuf->append_i64(m_id);
217  cbuf->append_i32(m_mask);
218  cbuf->append_i32(m_mode);
219  cbuf->append_i64(m_generation);
220  }
221  private:
222  uint32_t m_mode;
223  uint64_t m_generation;
224  };
225 } // namespace Hyperspace
226 
227 #endif // Hyperspace_Event_h
Retrieves system information (hardware, installation directory, etc)
static std::mutex mutex
Definition: Logger.cc:43
virtual void encode(Hypertable::CommBuf *cbuf)
Definition: Event.h:215
static BerkeleyDbFilesystem * ms_bdb_fs
Definition: Event.h:137
EventLockGranted(uint64_t id, uint32_t mode, uint64_t generation)
Definition: Event.h:210
Network communication event.
Definition: Event.h:54
uint32_t get_mask()
Definition: Event.h:103
void decrement_notification_count()
Definition: Event.h:110
virtual ~Event()
Definition: Event.h:99
Event(uint64_t id, uint32_t mask)
Definition: Event.h:98
static void set_bdb_fs(BerkeleyDbFilesystem *bdb_fs)
Definition: Event.h:132
uint64_t get_id()
Definition: Event.h:101
Declarations for BerkeleyDbFilesystem.
std::shared_ptr< Event > HyperspaceEventPtr
Definition: Event.h:145
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
void wait_for_notifications()
Definition: Event.h:124
uint64_t m_id
Definition: Event.h:140
void append_vstr(const char *str)
Appends a c-style string to the primary buffer.
Definition: CommBuf.h:250
Hyperspace definitions
virtual uint32_t encoded_length()
Definition: Event.h:214
#define HT_BDBTXN_EVT_END(...)
Definition: Event.h:69
EventLockReleased(uint64_t id)
Definition: Event.h:196
std::string m_name
Definition: Event.h:167
Compatibility Macros for C/C++.
uint32_t m_mask
Definition: Event.h:141
#define HT_BDBTXN_EVT_BEGIN(parent_txn)
Definition: Event.h:44
void append_i32(uint32_t ival)
Appends a 32-bit integer to the primary buffer.
Definition: CommBuf.h:230
Functions to serialize/deserialize primitives to/from a memory buffer.
EventNamed(uint64_t id, uint32_t mask, const std::string &name)
Definition: Event.h:153
virtual uint32_t encoded_length()
Definition: Event.h:197
EventLockAcquired(uint64_t id, uint32_t mode)
Definition: Event.h:177
virtual void encode(Hypertable::CommBuf *cbuf)
Definition: Event.h:198
virtual uint32_t encoded_length()
Definition: Event.h:180
Hyperspace filesystem implementation on top of BerkeleyDB.
void append_i64(uint64_t lval)
Appends a 64-bit integer to the primary buffer.
Definition: CommBuf.h:239
Hypertable definitions
Declarations for CommBuf.
Random number generator for int32, int64, double and ascii arrays.
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
virtual uint32_t encoded_length()
Definition: Event.h:156
virtual void encode(Hypertable::CommBuf *cbuf)
Definition: Event.h:181
std::condition_variable m_cond
Definition: Event.h:139
virtual void encode(Hypertable::CommBuf *cbuf)
Definition: Event.h:160
std::mutex m_mutex
Definition: Event.h:138
void increment_notification_count()
Definition: Event.h:105