0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Utility.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
29 #include "Utility.h"
30 
32 
33 #include <Common/Error.h>
34 #include <Common/Logger.h>
35 
36 #include <cstdio>
37 
38 using namespace std;
39 using namespace Hypertable;
40 
41 namespace {
42  const int BUFFER_SIZE = 32768;
43 }
44 
45 void FsBroker::Lib::copy(ClientPtr &client, const std::string &from,
46  const std::string &to, int64_t offset) {
47  DispatchHandlerSynchronizer sync_handler;
48  int32_t from_fd {};
49  int32_t to_fd {};
50 
51  try {
52 
53  from_fd = client->open(from, 0);
54  if (offset > 0)
55  client->seek(from_fd, offset);
56 
57  to_fd = client->create(to, Filesystem::OPEN_FLAG_OVERWRITE, -1, -1, -1);
58 
59  client->read(from_fd, BUFFER_SIZE, &sync_handler);
60  client->read(from_fd, BUFFER_SIZE, &sync_handler);
61  client->read(from_fd, BUFFER_SIZE, &sync_handler);
62 
63  EventPtr event;
64  uint8_t *dst;
65  uint32_t amount;
66 
67  while (sync_handler.wait_for_reply(event)) {
68 
69  client->decode_response_pread(event, (const void **)&dst, (uint64_t *)&offset, &amount);
70 
71  StaticBuffer send_buf;
72  if (amount > 0) {
73  send_buf.set(dst, amount, false);
74  client->append(to_fd, send_buf);
75  }
76 
77  if (amount < (uint32_t)BUFFER_SIZE) {
78  sync_handler.wait_for_reply(event);
79  break;
80  }
81 
82  client->read(from_fd, BUFFER_SIZE, &sync_handler);
83  }
84 
85  client->close(from_fd);
86  client->close(to_fd);
87 
88  }
89  catch (Exception &e) {
90  if (from_fd)
91  client->close(from_fd);
92  if (to_fd)
93  client->close(to_fd);
94  throw;
95  }
96 
97 }
98 
99 
100 void FsBroker::Lib::copy_from_local(ClientPtr &client, const string &from, const string &to,
101  int64_t offset) {
102  DispatchHandlerSynchronizer sync_handler;
103  int32_t fd = 0;
104  FILE *fp = 0;
105  size_t nread;
106  uint8_t *buf;
107  StaticBuffer send_buf;
108 
109  try {
110 
111  if ((fp = fopen(from.c_str(), "r")) == 0)
112  HT_THROW(Error::EXTERNAL, strerror(errno));
113 
114  if (offset > 0) {
115  if (fseek(fp, (long)offset, SEEK_SET) != 0)
116  HT_THROW(Error::EXTERNAL, strerror(errno));
117  }
118 
119  fd = client->create(to, Filesystem::OPEN_FLAG_OVERWRITE, -1, -1, -1);
120 
121  // send 3 appends
122  for (int i=0; i<3; i++) {
123  buf = new uint8_t [BUFFER_SIZE];
124  if ((nread = fread(buf, 1, BUFFER_SIZE, fp)) == 0)
125  goto done;
126  send_buf.set(buf, nread, true);
127  client->append(fd, send_buf);
128  }
129 
130  while (true) {
131  buf = new uint8_t [BUFFER_SIZE];
132  if ((nread = fread(buf, 1, BUFFER_SIZE, fp)) == 0)
133  break;
134  send_buf.set(buf, nread, true);
135  client->append(fd, send_buf);
136  }
137 
138  done:
139  client->close(fd);
140 
141  }
142  catch (Exception &e) {
143  if (fp)
144  fclose(fp);
145  throw;
146  }
147 }
148 
149 
150 void FsBroker::Lib::copy_to_local(ClientPtr &client, const string &from, const string &to,
151  int64_t offset) {
152  DispatchHandlerSynchronizer sync_handler;
153  FILE *fp = 0;
154 
155  try {
156 
157  if ((fp = fopen(to.c_str(), "w+")) == 0)
158  HT_THROW(Error::EXTERNAL, strerror(errno));
159 
160  int32_t fd = client->open(from, 0);
161 
162  if (offset > 0)
163  client->seek(fd, offset);
164 
165  client->read(fd, BUFFER_SIZE, &sync_handler);
166  client->read(fd, BUFFER_SIZE, &sync_handler);
167  client->read(fd, BUFFER_SIZE, &sync_handler);
168 
169  EventPtr event;
170  const uint8_t *dst;
171  uint32_t amount;
172 
173  while (sync_handler.wait_for_reply(event)) {
174 
175  client->decode_response_pread(event, (const void **)&dst, (uint64_t *)&offset, &amount);
176 
177  if (amount > 0) {
178  if (fwrite(dst, amount, 1, fp) != 1)
179  HT_THROW(Error::EXTERNAL, strerror(errno));
180  }
181 
182  if (amount < (uint32_t)BUFFER_SIZE) {
183  sync_handler.wait_for_reply(event);
184  break;
185  }
186 
187  client->read(fd, BUFFER_SIZE, &sync_handler);
188  }
189 
190  client->close(fd);
191 
192  fclose(fp);
193 
194  }
195  catch (Exception &e) {
196  if (fp)
197  fclose(fp);
198  throw;
199  }
200 
201 }
202 
A memory buffer of static size.
Definition: StaticBuffer.h:45
void set(uint8_t *data, uint32_t len, bool take_ownership=true)
Sets data pointer; the existing buffer is discarded and deleted.
Definition: StaticBuffer.h:175
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
void copy(ClientPtr &client, const std::string &from, const std::string &to, int64_t offset=0)
Definition: Utility.cc:45
void copy_to_local(ClientPtr &client, const std::string &from, const std::string &to, int64_t offset=0)
Definition: Utility.cc:150
bool wait_for_reply(EventPtr &event)
This method is used by a client to synchronize.
#define BUFFER_SIZE
std::shared_ptr< Client > ClientPtr
Smart pointer to Client.
Definition: Client.h:233
Logging routines and macros.
Compatibility Macros for C/C++.
Hypertable definitions
DispatchHandler class used to synchronize with response messages.
void copy_from_local(ClientPtr &client, const std::string &from, const std::string &to, int64_t offset=0)
Definition: Utility.cc:100
This is a generic exception class for Hypertable.
Definition: Error.h:314
Declarations of utility functions.
Error codes, Exception handling, error logging.
#define HT_THROW(_code_, _msg_)
Definition: Error.h:478
Declarations for DispatchHandlerSynchronizer.