0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
RangeServerConnectionManager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 
26 #include <Common/Config.h>
27 
28 #include <algorithm>
29 #include <iterator>
30 
31 using namespace Hypertable;
32 using namespace std;
33 
35  m_connections_iter = m_connections.end();
36  m_disk_threshold = Config::properties->get_i32("Hypertable.Master.DiskThreshold.Percentage");
37 }
38 
40  lock_guard<mutex> lock(m_mutex);
41 
42  {
44  if (find_server_by_location_unlocked(rsc->location(), tmp_rsc))
45  HT_FATALF("Attempt to add %s which conflicts with previously added %s",
46  rsc->to_str().c_str(), tmp_rsc->to_str().c_str());
47  }
48 
49  auto result = m_connections.push_back( RangeServerConnectionEntry(rsc) );
50  if (!result.second)
51  HT_FATALF("Attempt to add %s which conflicts with previously added entry",
52  rsc->to_str().c_str());
53 }
54 
55 bool
58  lock_guard<mutex> lock(m_mutex);
59 
60  LocationIndex &hash_index = m_connections.get<1>();
61  auto iter = hash_index.find(location);
62  if (iter != hash_index.end()) {
63  rsc = iter->rsc;
64  rsc->set_removed();
65  hash_index.erase(iter);
66  m_connections_iter = m_connections.begin();
67  return true;
68  }
69  return false;
70 }
71 
73  const String &hostname, InetAddr local_addr, InetAddr public_addr) {
74  lock_guard<mutex> lock(m_mutex);
75  LocationIndex &location_index = m_connections.get<1>();
76  LocationIndex::iterator orig_iter;
77 
78  HT_INFOF("connect_server(%s, '%s', local=%s, public=%s)",
79  rsc->location().c_str(), hostname.c_str(),
80  local_addr.format().c_str(), public_addr.format().c_str());
81 
82  if ((orig_iter = location_index.find(rsc->location())) == location_index.end()) {
83 
84  if (rsc->connect(hostname, local_addr, public_addr))
85  m_conn_count++;
86 
87  m_connections.push_back(RangeServerConnectionEntry(rsc));
88  }
89  else {
90  bool needs_reindex = false;
91 
92  rsc = orig_iter->rsc;
93 
94  if (rsc->connected()) {
95  HT_ERRORF("Attempted to connect '%s' but failed because already connected.",
96  rsc->location().c_str());
97  return false;
98  }
99 
100  if (hostname != rsc->hostname()) {
101  HT_INFOF("Changing hostname for %s from '%s' to '%s'",
102  rsc->location().c_str(), rsc->hostname().c_str(),
103  hostname.c_str());
104  needs_reindex = true;
105  }
106 
107  if (local_addr != rsc->local_addr()) {
108  HT_INFOF("Changing local address for %s from '%s' to '%s'",
109  rsc->location().c_str(), rsc->local_addr().format().c_str(),
110  local_addr.format().c_str());
111  needs_reindex = true;
112  }
113 
114  if (public_addr != rsc->public_addr()) {
115  HT_INFOF("Changing public address for %s from '%s' to '%s'",
116  rsc->location().c_str(), rsc->public_addr().format().c_str(),
117  public_addr.format().c_str());
118  needs_reindex = true;
119  }
120 
121  if (orig_iter->rsc->connect(hostname, local_addr, public_addr))
122  m_conn_count++;
123 
124  if (needs_reindex) {
125  location_index.erase(orig_iter);
126  m_connections.push_back(RangeServerConnectionEntry(rsc));
127  m_connections_iter = m_connections.begin();
128  }
129  }
130  return true;
131 }
132 
134  lock_guard<mutex> lock(m_mutex);
135  if (rsc->disconnect()) {
136  HT_ASSERT(m_conn_count > 0);
137  m_conn_count--;
138  return true;
139  }
140  return false;
141 }
142 
144  lock_guard<mutex> lock(m_mutex);
145  LocationIndex &location_index = m_connections.get<1>();
146  LocationIndex::iterator iter = location_index.find(location);
147  if (iter != location_index.end() && iter->rsc->connected())
148  return true;
149  return false;
150 }
151 
154  lock_guard<mutex> lock(m_mutex);
155  return find_server_by_location_unlocked(location, rsc);
156 }
157 
159  LocationIndex &hash_index = m_connections.get<1>();
160  LocationIndex::iterator lookup_iter;
161 
162  if ((lookup_iter = hash_index.find(location)) == hash_index.end()) {
163  //HT_DEBUG_OUT << "can't find server with location=" << location << HT_END;
164  //for (Sequence::iterator iter = m_connections.begin(); iter != m_connections.end(); ++iter) {
165  // HT_DEBUGF("Contains %s host=%s local=%s public=%s", iter->location().c_str(),
166  // iter->hostname().c_str(), iter->local_addr().format().c_str(),
167  // iter->public_addr().format().c_str());
168  //}
169  rsc = 0;
170  return false;
171  }
172  rsc = lookup_iter->rsc;
173  return true;
174 }
175 
176 
179  lock_guard<mutex> lock(m_mutex);
180  HostnameIndex &hash_index = m_connections.get<2>();
181 
182  pair<HostnameIndex::iterator, HostnameIndex::iterator> p
183  = hash_index.equal_range(hostname);
184  if (p.first != p.second) {
185  rsc = p.first->rsc;
186  if (++p.first == p.second)
187  return true;
188  rsc = 0;
189  }
190  return false;
191 }
192 
195  lock_guard<mutex> lock(m_mutex);
196  PublicAddrIndex &hash_index = m_connections.get<3>();
197  PublicAddrIndex::iterator lookup_iter;
198 
199  if ((lookup_iter = hash_index.find(addr)) == hash_index.end()) {
200  rsc = 0;
201  return false;
202  }
203  rsc = lookup_iter->rsc;
204  return true;
205 }
206 
209  lock_guard<mutex> lock(m_mutex);
210  LocalAddrIndex &hash_index = m_connections.get<4>();
211 
212  for (pair<LocalAddrIndex::iterator, LocalAddrIndex::iterator> p
213  = hash_index.equal_range(addr);
214  p.first != p.second; ++p.first) {
215  if (p.first->connected()) {
216  rsc = p.first->rsc;
217  return true;
218  }
219  }
220  return false;
221 }
222 
223 
225  bool urgent) {
226  lock_guard<mutex> lock(m_mutex);
227  double minimum_disk_use = 100;
228  RangeServerConnectionPtr urgent_server;
229 
230  if (m_connections.empty())
231  return false;
232 
233  if (m_connections_iter == m_connections.end())
234  m_connections_iter = m_connections.begin();
235 
236  auto saved_iter = m_connections_iter;
237 
238  do {
239  ++m_connections_iter;
240  if (m_connections_iter == m_connections.end())
241  m_connections_iter = m_connections.begin();
242  if (m_connections_iter->rsc->connected() &&
243  !m_connections_iter->rsc->is_recovering()) {
244  if (m_connections_iter->rsc->get_disk_fill_percentage() <
245  (double)m_disk_threshold) {
246  rsc = m_connections_iter->rsc;
247  return true;
248  }
249  if (m_connections_iter->rsc->get_disk_fill_percentage() < minimum_disk_use) {
250  minimum_disk_use = m_connections_iter->rsc->get_disk_fill_percentage();
251  urgent_server = m_connections_iter->rsc;
252  }
253  }
254  } while (m_connections_iter != saved_iter);
255 
256  if (urgent && urgent_server) {
257  rsc = urgent_server;
258  return true;
259  }
260 
261  return false;
262 }
263 
265  lock_guard<mutex> lock(m_mutex);
266  size_t count = 0;
267  for (auto &entry : m_connections) {
268  if (!entry.rsc->get_removed())
269  count++;
270  }
271  return count;
272 }
273 
274 
275 void RangeServerConnectionManager::get_servers(vector<RangeServerConnectionPtr> &servers) {
276  lock_guard<mutex> lock(m_mutex);
277  for (auto &entry : m_connections) {
278  if (!entry.rsc->get_removed())
279  servers.push_back(entry.rsc);
280  }
281 }
282 
284  vector<RangeServerConnectionPtr> &connections) {
285  lock_guard<mutex> lock(m_mutex);
286  LocationIndex &location_index = m_connections.get<1>();
287  LocationIndex::iterator iter;
288 
289  for (auto &location : locations) {
290  auto iter = location_index.find(location);
291  if (iter != location_index.end() && !iter->rsc->is_recovering())
292  connections.push_back(iter->rsc);
293  }
294 }
295 
296 void RangeServerConnectionManager::set_servers_balanced(const vector<RangeServerConnectionPtr> &unbalanced) {
297  lock_guard<mutex> lock(m_mutex);
298  for (const auto rsc : unbalanced)
299  rsc->set_balanced();
300 }
301 
303  lock_guard<mutex> lock(m_mutex);
304  for (auto &entry : m_connections) {
305  if (!entry.rsc->get_removed() && !entry.rsc->is_recovering() && !entry.rsc->get_balanced())
306  return true;
307  }
308  return false;
309 }
310 
311 void RangeServerConnectionManager::set_range_server_state(vector<RangeServerState> &states) {
312  lock_guard<mutex> lock(m_mutex);
313  LocationIndex &hash_index = m_connections.get<1>();
314  LocationIndex::iterator lookup_iter;
315 
316  for (auto &state : states) {
317  if ((lookup_iter = hash_index.find(state.location)) == hash_index.end())
318  continue;
319  lookup_iter->rsc->set_disk_fill_percentage(state.disk_usage);
320  }
321 }
bool find_server_by_location(const String &location, RangeServerConnectionPtr &rsc)
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
ConnectionList::nth_index< 1 >::type LocationIndex
bool find_server_by_hostname(const String &hostname, RangeServerConnectionPtr &rsc)
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
PropertiesPtr properties
This singleton map stores all options.
Definition: Config.cc:47
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
bool next_available_server(RangeServerConnectionPtr &rsc, bool urgent=false)
STL namespace.
void get_servers(std::vector< RangeServerConnectionPtr > &servers)
void set_servers_balanced(const std::vector< RangeServerConnectionPtr > &servers)
void set_range_server_state(std::vector< RangeServerState > &states)
#define HT_ASSERT(_e_)
Definition: Logger.h:396
bool connect_server(RangeServerConnectionPtr &rsc, const String &hostname, InetAddr local_addr, InetAddr public_addr)
bool find_server_by_location_unlocked(const String &location, RangeServerConnectionPtr &rsc)
Encapsulate an internet address.
Definition: InetAddr.h:66
bool disconnect_server(RangeServerConnectionPtr &rsc)
Compatibility Macros for C/C++.
ConnectionList::nth_index< 2 >::type HostnameIndex
ConnectionList::nth_index< 3 >::type PublicAddrIndex
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
Hypertable definitions
#define HT_FATALF(msg,...)
Definition: Logger.h:343
bool find_server_by_local_addr(InetAddr addr, RangeServerConnectionPtr &rsc)
void add_server(RangeServerConnectionPtr &rsc)
Adds a range server connection.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
ConnectionList::nth_index< 4 >::type LocalAddrIndex
bool remove_server(const std::string &location, RangeServerConnectionPtr &rsc)
Removes range server connection.
#define HT_ERRORF(msg,...)
Definition: Logger.h:300
void get_valid_connections(StringSet &locations, std::vector< RangeServerConnectionPtr > &connections)
Configuration settings.
bool find_server_by_public_addr(InetAddr addr, RangeServerConnectionPtr &rsc)