0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Protocol.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "Protocol.h"
31 
32 #include <AsyncComm/CommHeader.h>
33 
34 #include <Common/Error.h>
35 #include <Common/Serialization.h>
36 #include <Common/StringExt.h>
37 #include <Common/FailureInducer.h>
38 
39 #include <cassert>
40 #include <iostream>
41 
42 using namespace std;
43 using namespace Hyperspace;
44 using namespace Hypertable;
45 using namespace Serialization;
46 
48  "keepalive",
49  "handshake",
50  "open",
51  "stat",
52  "cancel",
53  "close",
54  "poison",
55  "mkdir",
56  "attrset",
57  "attrget",
58  "attrdel",
59  "attrexists",
60  "attrlist",
61  "exists",
62  "delete",
63  "readdir",
64  "lock",
65  "release",
66  "checksequencer",
67  "status",
68  "redirect",
69  "readdirattr",
70  "attrincr",
71  "readpathattr",
72  "shutdown"
73 };
74 
75 
76 /*
77  *
78  */
79 const char *Hyperspace::Protocol::command_text(uint64_t command) {
80  if (command >= COMMAND_MAX)
81  return "UNKNOWN";
82  return command_strs[command];
83 }
84 
85 /*
86  *
87  */
88 CommBuf *
90  std::set<uint64_t> &delivered_events, bool destroy_session) {
91  CommHeader header(COMMAND_KEEPALIVE);
92  header.flags |= CommHeader::FLAGS_BIT_URGENT;
93  CommBuf *cbuf = new CommBuf(header, 8 + 4 + (8*delivered_events.size()) + 1);
94  cbuf->append_i64(session_id);
95  cbuf->append_i32(delivered_events.size());
96  for (auto event_id : delivered_events)
97  cbuf->append_i64(event_id);
98  cbuf->append_bool(destroy_session);
99  return cbuf;
100 }
101 
102 
103 /*
104  *
105  */
106 CommBuf *
108  int error) {
109  CommHeader header(COMMAND_KEEPALIVE);
110  header.flags |= CommHeader::FLAGS_BIT_URGENT;
111  CommBuf *cbuf = new CommBuf(header, 16);
112  cbuf->append_i64(session_id);
113  cbuf->append_i32(error);
114  cbuf->append_i32(0);
115  return cbuf;
116 }
117 
118 
119 /*
120  *
121  */
122 CommBuf *
124  SessionDataPtr &session_data) {
125  uint32_t len = 16;
126  CommBuf *cbuf = 0;
127  CommHeader header(COMMAND_KEEPALIVE);
128  header.flags |= CommHeader::FLAGS_BIT_URGENT;
129 
130  cbuf = session_data->serialize_notifications_for_keepalive(header, len);
131  return cbuf;
132 }
133 
134 /*
135  *
136  */
138  CommHeader header(COMMAND_REDIRECT);
139  header.flags |= CommHeader::FLAGS_BIT_URGENT;
140  size_t len = encoded_length_vstr(host);
141  CommBuf *cbuf = new CommBuf(header, len);
142  cbuf->append_vstr(host);
143  return cbuf;
144 }
145 
146 /*
147  *
148  */
150  const std::string &name) {
151  CommHeader header(COMMAND_HANDSHAKE);
152  header.flags |= CommHeader::FLAGS_BIT_URGENT;
153  size_t len = 8 + encoded_length_vstr(name);
154  CommBuf *cbuf = new CommBuf(header, len);
155  cbuf->append_i64(session_id);
156  cbuf->append_vstr(name);
157  return cbuf;
158 }
159 
160 
161 /*
162  *
163  */
164 CommBuf *
166  uint32_t flags, HandleCallbackPtr &callback,
167  const std::vector<Attribute> &init_attrs) {
168  size_t len = 16 + encoded_length_vstr(name.size());
169  CommHeader header(COMMAND_OPEN);
170  for (size_t i = 0; i < init_attrs.size(); i++)
171  len += encoded_length_vstr(init_attrs[i].name)
172  + encoded_length_vstr(init_attrs[i].value_len);
173 
174  CommBuf *cbuf = new CommBuf(header, len);
175 
176  if (HT_FAILURE_SIGNALLED("bad-hyperspace-version")) {
177  cbuf->append_i32(Protocol::Version + 1);
178  }
179  else {
180  cbuf->append_i32(Protocol::Version);
181  }
182  cbuf->append_i32(flags);
183  if (callback)
184  cbuf->append_i32(callback->get_event_mask());
185  else
186  cbuf->append_i32(0);
187  cbuf->append_vstr(name);
188 
189  // append initial attributes
190  cbuf->append_i32(init_attrs.size());
191  for (size_t i=0; i<init_attrs.size(); i++) {
192  cbuf->append_vstr(init_attrs[i].name);
193  cbuf->append_vstr(init_attrs[i].value, init_attrs[i].value_len);
194  }
195 
196  return cbuf;
197 }
198 
199 
201  CommHeader header(COMMAND_CLOSE);
202  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
203  CommBuf *cbuf = new CommBuf(header, 8);
204  cbuf->append_i64(handle);
205  return cbuf;
206 }
207 
208 CommBuf *Hyperspace::Protocol::create_mkdir_request(const std::string &name, bool create_intermediate, const std::vector<Attribute> *init_attrs) {
209  size_t attrs_len = 4;
210  if (init_attrs) {
211  for (const auto& attr : *init_attrs)
212  attrs_len += encoded_length_vstr(attr.name)
213  + encoded_length_vstr(attr.value_len);
214  }
215  CommHeader header(COMMAND_MKDIR);
216  header.gid = filename_to_group(name);
217  CommBuf *cbuf = new CommBuf(header, encoded_length_vstr(name.size()) + 1 + attrs_len);
218  cbuf->append_vstr(name);
219  cbuf->append_bool(create_intermediate);
220  if (init_attrs) {
221  cbuf->append_i32(init_attrs->size());
222  for (const auto& attr : *init_attrs) {
223  cbuf->append_vstr(attr.name);
224  cbuf->append_vstr(attr.value, attr.value_len);
225  }
226  }
227  else
228  cbuf->append_i32(0);
229  return cbuf;
230 }
231 
232 
234  CommHeader header(COMMAND_DELETE);
235  header.gid = filename_to_group(name);
236  CommBuf *cbuf = new CommBuf(header, encoded_length_vstr(name.size()));
237  cbuf->append_vstr(name);
238  return cbuf;
239 }
240 
241 
242 CommBuf *
243 Hyperspace::Protocol::create_attr_set_request(uint64_t handle, const std::string *name,
244  uint32_t oflags, const std::string &attr, const void *value, size_t value_len) {
245  size_t attr_len = 4 + encoded_length_vstr(attr.size())
246  + encoded_length_vstr(value_len);
247  CommHeader header(COMMAND_ATTRSET);
248  CommBuf *cbuf;
249  if (name && !name->empty()) {
250  header.gid = filename_to_group(*name);
251  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size()) + 4
252  + attr_len);
253  cbuf->append_bool(true);
254  cbuf->append_vstr(*name);
255  cbuf->append_i32(oflags);
256  }
257  else {
258  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
259  cbuf = new CommBuf(header, 1 + 8 + attr_len);
260  cbuf->append_bool(false);
261  cbuf->append_i64(handle);
262  }
263  cbuf->append_i32(1); // one attribute follows
264  cbuf->append_vstr(attr);
265  cbuf->append_vstr(value, value_len);
266  return cbuf;
267 }
268 
269 CommBuf *
270 Hyperspace::Protocol::create_attr_set_request(uint64_t handle, const std::string *name,
271  uint32_t oflags, const std::vector<Attribute> &attrs) {
272  size_t attrs_len = 4;
273  for (const auto& attr : attrs)
274  attrs_len += encoded_length_vstr(attr.name)
275  + encoded_length_vstr(attr.value_len);
276 
277  CommHeader header(COMMAND_ATTRSET);
278  CommBuf *cbuf;
279  if (name && !name->empty()) {
280  header.gid = filename_to_group(*name);
281  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size()) + 4
282  + attrs_len);
283  cbuf->append_bool(true);
284  cbuf->append_vstr(*name);
285  cbuf->append_i32(oflags);
286  }
287  else {
288  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
289  cbuf = new CommBuf(header, 1 + 8 + attrs_len);
290  cbuf->append_bool(false);
291  cbuf->append_i64(handle);
292  }
293  cbuf->append_i32(attrs.size());
294  for (const auto& attr : attrs) {
295  cbuf->append_vstr(attr.name);
296  cbuf->append_vstr(attr.value, attr.value_len);
297  }
298  return cbuf;
299 }
300 
301 CommBuf *
303  const std::string *name,
304  const std::string &attr) {
306  CommBuf *cbuf;
307  if (name && !name->empty()) {
308  header.gid = filename_to_group(*name);
309  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size())
310  + encoded_length_vstr(attr.size()));
311  cbuf->append_bool(true);
312  cbuf->append_vstr(*name);
313  }
314  else {
315  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
316  cbuf = new CommBuf(header, 1 + 8 + encoded_length_vstr(attr.size()));
317  cbuf->append_bool(false);
318  cbuf->append_i64(handle);
319  }
320  cbuf->append_vstr(attr);
321  return cbuf;
322 }
323 
324 CommBuf *
326  const std::string *name,
327  const std::string &attr) {
328  CommHeader header(COMMAND_ATTRGET);
329  CommBuf *cbuf;
330  if (name && !name->empty()) {
331  header.gid = filename_to_group(*name);
332  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size())
333  + 4 + encoded_length_vstr(attr.size()));
334  cbuf->append_bool(true);
335  cbuf->append_vstr(*name);
336  }
337  else {
338  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
339  cbuf = new CommBuf(header, 1 + 8 + 4 + encoded_length_vstr(attr.size()));
340  cbuf->append_bool(false);
341  cbuf->append_i64(handle);
342  }
343  cbuf->append_i32(1); // one attr follows
344  cbuf->append_vstr(attr);
345  return cbuf;
346 }
347 
348 CommBuf *
350  const std::string *name,
351  const std::vector<std::string> &attrs) {
352  size_t len = 0;
353  for (const auto &attr : attrs)
354  len += encoded_length_vstr(attr.size());
355 
356  CommHeader header(COMMAND_ATTRGET);
357  CommBuf *cbuf;
358  if (name && !name->empty()) {
359  header.gid = filename_to_group(*name);
360  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size())
361  + 4 + len);
362  cbuf->append_bool(true);
363  cbuf->append_vstr(*name);
364  }
365  else {
366  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
367  cbuf = new CommBuf(header, 1 + 8 + 4 + len);
368  cbuf->append_bool(false);
369  cbuf->append_i64(handle);
370  }
371  cbuf->append_i32(attrs.size());
372  for (const auto &attr : attrs)
373  cbuf->append_vstr(attr);
374  return cbuf;
375 }
376 
377 
378 CommBuf *
380  const std::string &name) {
381  CommHeader header(COMMAND_ATTRDEL);
382  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
383  CommBuf *cbuf = new CommBuf(header, 8 + encoded_length_vstr(name.size()));
384  cbuf->append_i64(handle);
385  cbuf->append_vstr(name);
386  return cbuf;
387 }
388 
389 CommBuf *
391  const std::string &attr) {
393  CommBuf *cbuf;
394  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
395  cbuf = new CommBuf(header, 1 + 8 + encoded_length_vstr(attr));
396  cbuf->append_bool(false);
397  cbuf->append_i64(handle);
398  cbuf->append_vstr(attr);
399  return cbuf;
400 }
401 
402 CommBuf *
404  const std::string &attr) {
406  CommBuf *cbuf;
407  header.gid = filename_to_group(name);
408  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name)
409  + encoded_length_vstr(attr));
410  cbuf->append_bool(true);
411  cbuf->append_vstr(name);
412  cbuf->append_vstr(attr);
413  return cbuf;
414 }
415 
416 CommBuf *
419  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
420  CommBuf *cbuf = new CommBuf(header, 8);
421  cbuf->append_i64(handle);
422  return cbuf;
423 }
424 
426  CommHeader header(COMMAND_READDIR);
427  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
428  CommBuf *cbuf = new CommBuf(header, 8);
429  cbuf->append_i64(handle);
430  return cbuf;
431 }
432 
434  const std::string *name, const std::string &attr, bool include_sub_entries) {
436  CommBuf *cbuf;
437  if (name && !name->empty()) {
438  header.gid = filename_to_group(*name);
439  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size()) +
440  encoded_length_vstr(attr.size()) + 1);
441  cbuf->append_bool(true);
442  cbuf->append_vstr(*name);
443  }
444  else {
445  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
446  cbuf = new CommBuf(header, 1 + 8 + encoded_length_vstr(attr.size()) + 1);
447  cbuf->append_bool(false);
448  cbuf->append_i64(handle);
449  }
450  cbuf->append_vstr(attr);
451  cbuf->append_bool(include_sub_entries);
452  return cbuf;
453 }
454 
456  const std::string *name, const std::string &attr) {
458  CommBuf *cbuf;
459  if (name && !name->empty()) {
460  header.gid = filename_to_group(*name);
461  cbuf = new CommBuf(header, 1 + encoded_length_vstr(name->size()) +
462  encoded_length_vstr(attr.size()));
463  cbuf->append_bool(true);
464  cbuf->append_vstr(*name);
465  }
466  else {
467  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
468  cbuf = new CommBuf(header, 1 + 8 + encoded_length_vstr(attr.size()));
469  cbuf->append_bool(false);
470  cbuf->append_i64(handle);
471  }
472  cbuf->append_vstr(attr);
473  return cbuf;
474 }
475 
477  CommHeader header(COMMAND_EXISTS);
478  header.gid = filename_to_group(name);
479  CommBuf *cbuf = new CommBuf(header, encoded_length_vstr(name.size()));
480  cbuf->append_vstr(name);
481  return cbuf;
482 }
483 
484 
485 CommBuf *
486 Hyperspace::Protocol::create_lock_request(uint64_t handle, uint32_t mode,
487  bool try_lock) {
488  CommHeader header(COMMAND_LOCK);
489  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
490  CommBuf *cbuf = new CommBuf(header, 13);
491  cbuf->append_i64(handle);
492  cbuf->append_i32(mode);
493  cbuf->append_byte(try_lock);
494  return cbuf;
495 }
496 
497 
499  CommHeader header(COMMAND_RELEASE);
500  header.gid = (uint32_t)((handle ^ (handle >> 32)) & 0x0FFFFFFFFLL);
501  CommBuf *cbuf = new CommBuf(header, 8);
502  cbuf->append_i64(handle);
503  return cbuf;
504 }
505 
506 
507 /*
508  *
509  */
511  CommHeader header(COMMAND_STATUS);
512  header.flags |= CommHeader::FLAGS_BIT_URGENT;
513  CommBuf *cbuf = new CommBuf(header, 0);
514  return cbuf;
515 }
516 
517 
520  CommBuf *cbuf = new CommBuf(header, 0);
521  return cbuf;
522 }
static CommBuf * create_lock_request(uint64_t handle, uint32_t mode, bool try_lock)
Definition: Protocol.cc:486
static CommBuf * create_attr_exists_request(uint64_t handle, const std::string &attr)
Creates attr_exists request message.
Definition: Protocol.cc:390
static CommBuf * create_handshake_request(uint64_t session_id, const std::string &name)
Definition: Protocol.cc:149
The FailureInducer simulates errors.
uint16_t flags
Flags.
Definition: CommHeader.h:139
static CommBuf * create_server_keepalive_request(uint64_t session_id, int error)
Definition: Protocol.cc:107
Declarations for Protocol.
std::shared_ptr< SessionData > SessionDataPtr
Definition: SessionData.h:156
static CommBuf * create_readdir_request(uint64_t handle)
Definition: Protocol.cc:425
STL namespace.
size_t encoded_length_vstr(size_t len)
Computes the encoded length of vstr (vint64, data, null)
static CommBuf * create_attr_list_request(uint64_t handle)
Definition: Protocol.cc:417
static CommBuf * create_attr_set_request(uint64_t handle, const std::string *name, uint32_t oflags, const std::string &attr, const void *value, size_t value_len)
Definition: Protocol.cc:243
void append_vstr(const char *str)
Appends a c-style string to the primary buffer.
Definition: CommBuf.h:250
Hyperspace definitions
static CommBuf * create_readdir_attr_request(uint64_t handle, const std::string *name, const std::string &attr, bool include_sub_entries)
Definition: Protocol.cc:433
uint32_t gid
Group ID (see ApplicationQueue)
Definition: CommHeader.h:142
static CommBuf * create_shutdown_request()
Definition: Protocol.cc:518
static CommBuf * create_close_request(uint64_t handle)
Definition: Protocol.cc:200
virtual const char * command_text(uint64_t command)=0
Returns the string representation of a command code.
static CommBuf * create_mkdir_request(const std::string &name, bool create_intermediate, const std::vector< Attribute > *init_attrs)
Definition: Protocol.cc:208
Compatibility Macros for C/C++.
static CommBuf * create_attr_incr_request(uint64_t handle, const std::string *name, const std::string &attr)
Definition: Protocol.cc:302
void append_i32(uint32_t ival)
Appends a 32-bit integer to the primary buffer.
Definition: CommBuf.h:230
Functions to serialize/deserialize primitives to/from a memory buffer.
static const char * command_strs[COMMAND_MAX]
Definition: Protocol.h:207
static CommBuf * create_delete_request(const std::string &name)
Definition: Protocol.cc:233
static CommBuf * create_readpath_attr_request(uint64_t handle, const std::string *name, const std::string &attr)
Definition: Protocol.cc:455
static CommBuf * create_attrs_get_request(uint64_t handle, const std::string *name, const std::vector< std::string > &attrs)
Definition: Protocol.cc:349
std::shared_ptr< HandleCallback > HandleCallbackPtr
void append_i64(uint64_t lval)
Appends a 64-bit integer to the primary buffer.
Definition: CommBuf.h:239
Hypertable definitions
static CommBuf * create_attr_get_request(uint64_t handle, const std::string *name, const std::string &attr)
Definition: Protocol.cc:325
static CommBuf * create_status_request()
Definition: Protocol.cc:510
static CommBuf * create_client_keepalive_request(uint64_t session_id, std::set< uint64_t > &delivered_events, bool destroy_session=false)
Definition: Protocol.cc:89
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
static CommBuf * create_server_redirect_request(const std::string &host)
Definition: Protocol.cc:137
#define HT_FAILURE_SIGNALLED(_label_)
Message buffer for holding data to be transmitted over a network.
Definition: CommBuf.h:79
void append_byte(uint8_t bval)
Appends a byte of data to the primary buffer.
Definition: CommBuf.h:179
static CommBuf * create_release_request(uint64_t handle)
Definition: Protocol.cc:498
Declarations for CommHeader.
String extensions and helpers: sets, maps, append operators etc.
Error codes, Exception handling, error logging.
static CommBuf * create_exists_request(const std::string &name)
Definition: Protocol.cc:476
void append_bool(bool bval)
Appends a boolean value to the primary buffer.
Definition: CommBuf.h:173
static CommBuf * create_attr_del_request(uint64_t handle, const std::string &name)
Creates attr_del request message.
Definition: Protocol.cc:379
static CommBuf * create_open_request(const std::string &name, uint32_t flags, HandleCallbackPtr &callback, const std::vector< Attribute > &init_attrs)
Definition: Protocol.cc:165