0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
HandlerMap.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
28 #include <Common/Compat.h>
29 
30 #include "IOHandlerAccept.h"
31 #include "HandlerMap.h"
32 #include "ReactorFactory.h"
33 
34 #include <chrono>
35 
36 using namespace Hypertable;
37 using namespace std;
38 
40  lock_guard<mutex> lock(m_mutex);
41  HT_ASSERT(m_accept_handler_map.find(handler->get_address())
42  == m_accept_handler_map.end());
43  m_accept_handler_map[handler->get_local_address()] = handler;
44 }
45 
46 void HandlerMap::insert_handler(IOHandlerData *handler, bool checkout) {
47  lock_guard<mutex> lock(m_mutex);
48  HT_ASSERT(m_data_handler_map.find(handler->get_address())
49  == m_data_handler_map.end());
50  m_data_handler_map[handler->get_address()] = handler;
51  if (checkout)
52  handler->increment_reference_count();
53 }
54 
56  lock_guard<mutex> lock(m_mutex);
57  HT_ASSERT(m_datagram_handler_map.find(handler->get_local_address())
58  == m_datagram_handler_map.end());
59  m_datagram_handler_map[handler->get_local_address()] = handler;
60 }
61 
63  lock_guard<mutex> lock(m_mutex);
64  HT_ASSERT(m_raw_handler_map.find(handler->get_address())
65  == m_raw_handler_map.end());
66  m_raw_handler_map[handler->get_address()] = handler;
67 }
68 
69 
71  IOHandlerAccept **handler) {
72  lock_guard<mutex> lock(m_mutex);
73 
74  if ((*handler = lookup_accept_handler(addr.inet)) == 0)
76 
77  HT_ASSERT(!(*handler)->is_decomissioned());
78 
79  (*handler)->increment_reference_count();
80 
81  return Error::OK;
82 }
83 
84 
86  IOHandlerData **handler) {
87  lock_guard<mutex> lock(m_mutex);
88  InetAddr inet_addr;
89  int error;
90 
91  if ((error = translate_address(addr, &inet_addr)) != Error::OK)
92  return error;
93 
94  if ((*handler = lookup_data_handler(inet_addr)) == 0)
96 
97  HT_ASSERT(!(*handler)->is_decomissioned());
98 
99  (*handler)->increment_reference_count();
100 
101  return Error::OK;
102 }
103 
105  IOHandlerDatagram **handler) {
106  lock_guard<mutex> lock(m_mutex);
107 
108  if ((*handler = lookup_datagram_handler(addr.inet)) == 0)
110 
111  HT_ASSERT(!(*handler)->is_decomissioned());
112 
113  (*handler)->increment_reference_count();
114 
115  return Error::OK;
116 }
117 
119  IOHandlerRaw **handler) {
120  lock_guard<mutex> lock(m_mutex);
121 
122  if ((*handler = lookup_raw_handler(addr.inet)) == 0)
124 
125  HT_ASSERT(!(*handler)->is_decomissioned());
126 
127  (*handler)->increment_reference_count();
128 
129  return Error::OK;
130 }
131 
133  lock_guard<mutex> lock(m_mutex);
134  handler->decrement_reference_count();
135 }
136 
138  lock_guard<mutex> lock(m_mutex);
139  IOHandlerData *handler;
140  InetAddr inet_addr;
141  int error;
142 
143  if ((error = translate_address(addr, &inet_addr)) != Error::OK)
144  return error;
145 
146  if ((handler = lookup_data_handler(inet_addr)) == 0)
148 
149  return Error::OK;
150 }
151 
152 int HandlerMap::set_alias(const InetAddr &addr, const InetAddr &alias) {
153  lock_guard<mutex> lock(m_mutex);
155 
156  if (m_data_handler_map.find(alias) != m_data_handler_map.end())
158 
159  if ((iter = m_data_handler_map.find(addr)) == m_data_handler_map.end())
161 
162  (*iter).second->set_alias(alias);
163  m_data_handler_map[alias] = (*iter).second;
164 
165  return Error::OK;
166 }
167 
173  InetAddr local_addr = handler->get_local_address();
174  InetAddr remote_addr;
175  int error;
176 
177  if ((error = translate_address(handler->get_address(), &remote_addr)) != Error::OK)
178  return error;
179  if ((diter = m_data_handler_map.find(remote_addr)) != m_data_handler_map.end()) {
180  HT_ASSERT(handler == diter->second);
181  m_data_handler_map.erase(diter);
182  // Remove alias
183  remote_addr = handler->get_alias();
184  if ((diter = m_data_handler_map.find(remote_addr)) != m_data_handler_map.end()) {
185  HT_ASSERT(handler == diter->second);
186  m_data_handler_map.erase(diter);
187  }
188  }
189  else if ((dgiter = m_datagram_handler_map.find(local_addr))
190  != m_datagram_handler_map.end()) {
191  HT_ASSERT(handler == dgiter->second);
192  m_datagram_handler_map.erase(dgiter);
193  }
194  else if ((aiter = m_accept_handler_map.find(local_addr))
195  != m_accept_handler_map.end()) {
196  HT_ASSERT(handler == aiter->second);
197  m_accept_handler_map.erase(aiter);
198  }
199  else if ((riter = m_raw_handler_map.find(remote_addr))
200  != m_raw_handler_map.end()) {
201  HT_ASSERT(handler == riter->second);
202  m_raw_handler_map.erase(riter);
203  }
204  else
206  return Error::OK;
207 }
208 
210  lock_guard<mutex> lock(m_mutex);
211  return remove_handler_unlocked(handler);
212 }
213 
215  if (remove_handler_unlocked(handler) != Error::OK) {
216  HT_ASSERT(m_decomissioned_handlers.count(handler) > 0);
217  return;
218  }
219  m_decomissioned_handlers.insert(handler);
220  handler->decomission();
221 }
222 
224  lock_guard<mutex> lock(m_mutex);
229 
230  // IOHandlerData
231  for (diter = m_data_handler_map.begin(); diter != m_data_handler_map.end(); ++diter) {
232  m_decomissioned_handlers.insert(diter->second);
233  diter->second->decomission();
234  }
235  m_data_handler_map.clear();
236 
237  // IOHandlerDatagram
238  for (dgiter = m_datagram_handler_map.begin();
239  dgiter != m_datagram_handler_map.end(); ++dgiter) {
240  m_decomissioned_handlers.insert(dgiter->second);
241  dgiter->second->decomission();
242  }
243  m_datagram_handler_map.clear();
244 
245  // IOHandlerAccept
246  for (aiter = m_accept_handler_map.begin();
247  aiter != m_accept_handler_map.end(); ++aiter) {
248  m_decomissioned_handlers.insert(aiter->second);
249  aiter->second->decomission();
250  }
251  m_accept_handler_map.clear();
252 
253  // IOHandlerRaw
254  for (riter = m_raw_handler_map.begin();
255  riter != m_raw_handler_map.end(); ++riter) {
256  m_decomissioned_handlers.insert(riter->second);
257  riter->second->decomission();
258  }
259  m_raw_handler_map.clear();
260 
261 }
262 
264  lock_guard<mutex> lock(m_mutex);
265  bool is_decomissioned = m_decomissioned_handlers.count(handler) > 0;
266  HT_ASSERT(!is_decomissioned || handler->is_decomissioned());
267  return is_decomissioned && handler->reference_count() == 0;
268 }
269 
271  InetAddr inet_addr;
272  String hostname;
273  HT_ASSERT(proxy_addr.is_proxy());
274  if (!m_proxy_map.get_mapping(proxy_addr.proxy, hostname, inet_addr))
275  return false;
276  if (addr)
277  *addr = inet_addr;
278  return true;
279 }
280 
282  unique_lock<mutex> lock(m_mutex);
283  m_cond.wait(lock, [this](){ return m_decomissioned_handlers.empty(); });
284 }
285 
287  {
288  lock_guard<mutex> lock(m_mutex);
289  HT_ASSERT(m_decomissioned_handlers.count(handler) > 0);
290  HT_ASSERT(handler->reference_count() == 0);
291  m_decomissioned_handlers.erase(handler);
292  if (m_decomissioned_handlers.empty())
293  m_cond.notify_all();
294  }
295  handler->disconnect();
296  delete handler;
297 }
298 
299 int HandlerMap::add_proxy(const String &proxy, const String &hostname, const InetAddr &addr) {
300  lock_guard<mutex> lock(m_mutex);
301  ProxyMapT new_map, invalidated_map;
302 
303  m_proxy_map.update_mapping(proxy, hostname, addr, invalidated_map, new_map);
304 
305  for (const auto &v : new_map) {
306  IOHandler *handler = lookup_data_handler(v.second.addr);
307  if (handler)
308  handler->set_proxy(v.first);
309  }
310 
311  return propagate_proxy_map(new_map);
312 }
313 
314 int HandlerMap::remove_proxy(const String &proxy) {
315  lock_guard<mutex> lock(m_mutex);
316  ProxyMapT remove_map;
317  m_proxy_map.remove_mapping(proxy, remove_map);
318  if (!remove_map.empty()) {
319  IOHandler *handler;
320  for (const auto &v : remove_map) {
321  handler = lookup_data_handler(v.second.addr);
322  if (handler)
323  decomission_handler_unlocked(handler);
324  }
325  return propagate_proxy_map(remove_map);
326  }
327  return Error::OK;
328 }
329 
330 
332  m_proxy_map.get_map(proxy_map);
333 }
334 
335 
336 void HandlerMap::update_proxy_map(const char *message, size_t message_len) {
337  lock_guard<mutex> lock(m_mutex);
338  String mappings(message, message_len);
339  ProxyMapT new_map, invalidated_map;
340 
342 
343  m_proxy_map.update_mappings(mappings, invalidated_map, new_map);
344 
345  for (const auto &v : invalidated_map) {
346  IOHandler *handler = lookup_data_handler(v.second.addr);
347  if (handler) {
348  if (v.second.hostname == "--DELETED--")
349  decomission_handler_unlocked(handler);
350  }
351  }
352 
353  for (const auto &v : new_map) {
354  IOHandler *handler = lookup_data_handler(v.second.addr);
355  if (handler)
356  handler->set_proxy(v.first);
357  }
358 
359  //HT_INFOF("Updated proxy map = %s", m_proxy_map.to_str().c_str());
360 
361  m_proxies_loaded = true;
362  m_cond_proxy.notify_all();
363 }
364 
366  lock_guard<mutex> lock(m_mutex);
368  CommBufPtr comm_buf = m_proxy_map.create_update_message();
369  comm_buf->write_header_and_reset();
370  return handler->send_message(comm_buf);
371 }
372 
373 
375  unique_lock<mutex> lock(m_mutex);
376  timer.start();
377  auto drop_time = chrono::steady_clock::now() +
378  chrono::milliseconds(timer.remaining());
379  return m_cond_proxy.wait_until(lock, drop_time,
380  [this](){ return m_proxies_loaded; });
381 }
382 
384  int last_error = Error::OK;
385 
386  if (mappings.empty())
387  return Error::OK;
388 
390  String mapping;
391 
392  for (const auto &v : mappings)
393  mapping += v.first + "\t" + v.second.hostname + "\t" + InetAddr::format(v.second.addr) + "\n";
394 
395  uint8_t *buffer = new uint8_t [ mapping.length() + 1 ];
396  strcpy((char *)buffer, mapping.c_str());
397  boost::shared_array<uint8_t> payload(buffer);
398  CommHeader header;
400  std::vector<IOHandler *> decomission;
401  for (iter = m_data_handler_map.begin(); iter != m_data_handler_map.end(); ++iter) {
402  IOHandlerData *handler = iter->second;
403  if (handler) {
404  CommBufPtr comm_buf = make_shared<CommBuf>(header, 0, payload, mapping.length()+1);
405  comm_buf->write_header_and_reset();
406  int error = handler->send_message(comm_buf);
407  if (error != Error::OK) {
408  decomission.push_back(handler);
409  HT_ERRORF("Unable to propagate proxy mappings to %s - %s",
410  InetAddr(handler->get_address()).format().c_str(),
411  Error::get_text(error));
412  last_error = error;
413  }
414  }
415  }
416 
417  // Decomission any handlers that failed
418  for (auto handler : decomission)
419  decomission_handler_unlocked(handler);
420 
421  return last_error;
422 }
423 
424 int HandlerMap::translate_address(const CommAddress &addr, InetAddr *inet_addr) {
425  String hostname;
426 
427  HT_ASSERT(addr.is_set());
428 
429  if (addr.is_proxy()) {
430  if (!m_proxy_map.get_mapping(addr.proxy, hostname, *inet_addr))
432  }
433  else
434  memcpy(inet_addr, &addr.inet, sizeof(InetAddr));
435 
436  return Error::OK;
437 }
438 
440  SockAddrMap<IOHandlerAccept *>::iterator iter = m_accept_handler_map.find(addr);
441  if (iter != m_accept_handler_map.end())
442  return iter->second;
443  return 0;
444 }
445 
447  SockAddrMap<IOHandlerData *>::iterator iter = m_data_handler_map.find(addr);
448  if (iter != m_data_handler_map.end())
449  return iter->second;
450  return 0;
451 }
452 
454  SockAddrMap<IOHandlerDatagram *>::iterator iter = m_datagram_handler_map.find(addr);
455  if (iter != m_datagram_handler_map.end())
456  return iter->second;
457  return 0;
458 }
459 
461  SockAddrMap<IOHandlerRaw *>::iterator iter = m_raw_handler_map.find(addr);
462  if (iter != m_raw_handler_map.end())
463  return iter->second;
464  return 0;
465 }
void set_proxy(const String &proxy)
Sets the proxy name for this connection.
Definition: IOHandler.h:239
virtual void disconnect()
Disconnect connection.
Definition: IOHandler.h:399
InetAddr get_local_address()
Get local socket address for connection.
Definition: IOHandler.h:234
String proxy
Proxy name.
Definition: CommAddress.h:175
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
I/O handler for datagram (UDP) sockets.
int checkout_handler(const CommAddress &addr, IOHandlerAccept **handler)
Checks out accept I/O handler associated with addr.
Definition: HandlerMap.cc:70
String format(const char *fmt,...)
Returns a String using printf like format facilities Vanilla snprintf is about 1.5x faster than this...
Definition: String.cc:37
uint16_t flags
Flags.
Definition: CommHeader.h:139
bool wait_for_proxy_map(Timer &timer)
Waits for proxy map to get updated from a proxy map update message received from the master...
Definition: HandlerMap.cc:374
void decomission_handler_unlocked(IOHandler *handler)
Decomissions handler.
Definition: HandlerMap.cc:214
InetAddr get_address()
Gets the handler socket address.
Definition: IOHandler.h:229
void update_proxy_map(const char *message, size_t message_len)
Updates the proxy map with a proxy map update message received from the proxy master.
Definition: HandlerMap.cc:336
void increment_reference_count()
Increment reference count.
Definition: IOHandler.h:343
int set_alias(const InetAddr &addr, const InetAddr &alias)
Sets an alias address for an existing TCP address in map.
Definition: HandlerMap.cc:152
STL namespace.
int send_message(CommBufPtr &cbp, uint32_t timeout_ms=0, DispatchHandler *disp_handler=nullptr)
Sends message pointed to by cbp over socket associated with this I/O handler.
uint32_t remaining()
Returns the remaining time till expiry.
Definition: Timer.h:101
bool destroy_ok(IOHandler *handler)
Determines if handler can be destoryed.
Definition: HandlerMap.cc:263
int remove_handler_unlocked(IOHandler *handler)
Removes handler from map without locking m_mutex.
Definition: HandlerMap.cc:168
#define HT_ASSERT(_e_)
Definition: Logger.h:396
void decrement_reference_count()
Decrement reference count.
Definition: IOHandler.h:355
Unordered map specialization for InetAddr keys.
Definition: SockAddrMap.h:57
Declarations for IOHandlerAccept.
IOHandlerAccept * lookup_accept_handler(const InetAddr &addr)
Finds accept I/O handler associated with addr.
Definition: HandlerMap.cc:439
int remove_proxy(const String &proxy)
Removes a proxy name from the proxy map.
Definition: HandlerMap.cc:314
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
std::shared_ptr< CommBuf > CommBufPtr
Smart pointer to CommBuf.
Definition: CommBuf.h:305
void decomission()
Decomission handler.
Definition: IOHandler.h:382
IOHandlerRaw * lookup_raw_handler(const InetAddr &addr)
Finds raw I/O handler associated with addr.
Definition: HandlerMap.cc:460
int remove_handler(IOHandler *handler)
Removes handler from map.
Definition: HandlerMap.cc:209
std::unordered_map< String, ProxyAddressInfo > ProxyMapT
Forward mapping hash type from proxy name to ProxyAddressInfo.
Definition: ProxyMap.h:57
IOHandlerData * lookup_data_handler(const InetAddr &addr)
Finds data (TCP) I/O handler associated with addr.
Definition: HandlerMap.cc:446
Compatibility Macros for C/C++.
void insert_handler(IOHandlerAccept *handler)
Inserts an accept handler.
Definition: HandlerMap.cc:39
Base class for socket descriptor I/O handlers.
Definition: IOHandler.h:76
I/O handler for TCP sockets.
Definition: IOHandlerData.h:51
void purge_handler(IOHandler *handler)
Purges (removes) handler.
Definition: HandlerMap.cc:286
Declarations for HandlerMap.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
void wait_for_empty()
Waits for map to become empty.
Definition: HandlerMap.cc:281
bool is_set() const
Returns true if address has been initialized.
Definition: CommAddress.h:159
Hypertable definitions
InetAddr get_alias()
Get alias address for this connection.
Definition: IOHandler.h:327
void get_proxy_map(ProxyMapT &proxy_map)
Returns the proxy map.
Definition: HandlerMap.cc:331
Header for messages transmitted via AsyncComm.
Definition: CommHeader.h:40
void start()
Starts the timer.
Definition: Timer.h:64
int contains_data_handler(const CommAddress &addr)
Checks to see if addr is contained in map.
Definition: HandlerMap.cc:137
void decomission_all()
Decomissions all handlers.
Definition: HandlerMap.cc:223
Declarations for ReactorFactory.
A timer class to keep timeout states across AsyncComm related calls.
Definition: Timer.h:44
bool translate_proxy_address(const CommAddress &proxy_addr, InetAddr *addr)
Translates proxy_addr to its corresponding IPV4 address.
Definition: HandlerMap.cc:270
size_t reference_count()
Return reference count.
Definition: IOHandler.h:368
void decrement_reference_count(IOHandler *handler)
Decrements the reference count of handler.
Definition: HandlerMap.cc:132
int translate_address(const CommAddress &addr, InetAddr *inet_addr)
Translates addr to an InetAddr (IP address).
Definition: HandlerMap.cc:424
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
InetAddr inet
IPv4:port address.
Definition: CommAddress.h:176
I/O handler for accept (listen) sockets.
int32_t propagate_proxy_map(IOHandlerData *handler)
Sends the current proxy map over connection identified by handler.
Definition: HandlerMap.cc:365
void alias(const String &cmdline_opt, const String &file_opt, bool overwrite)
Setup command line option alias for config file option.
Definition: Config.cc:607
bool is_proxy() const
Returns true if address is of type CommAddress::PROXY.
Definition: CommAddress.h:147
bool is_decomissioned()
Checks to see if handler is decomissioned.
Definition: IOHandler.h:393
IOHandlerDatagram * lookup_datagram_handler(const InetAddr &addr)
Finds datagram I/O handler associated with addr.
Definition: HandlerMap.cc:453
int add_proxy(const String &proxy, const String &hostname, const InetAddr &addr)
Adds or updates proxy information.
Definition: HandlerMap.cc:299
I/O handler for raw sockets.
Definition: IOHandlerRaw.h:42
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
static bool proxy_master
Set to true if this process is acting as "Proxy Master".