0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
BalanceAlgorithmEvenRanges.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
25 
26 #include <Hypertable/Lib/Client.h>
27 
28 #include <algorithm>
29 #include <memory>
30 #include <vector>
31 
32 using namespace Hypertable;
33 using namespace std;
34 
35 namespace {
36 
37  typedef std::map<String, uint32_t> RangeDistributionMap;
38 
39  class TableSummary {
40  public:
41  TableSummary() : total_ranges(0), ranges_per_server(0) { }
42  uint32_t total_ranges;
43  uint32_t ranges_per_server;
44  RangeDistributionMap range_dist;
45  };
46 
47  typedef std::shared_ptr<TableSummary> TableSummaryPtr;
48 
49  // map of table_id to TableSummary
50  typedef std::map<const String, TableSummaryPtr> TableSummaryMap;
51 
52  typedef struct {
53  vector<String> servers;
54  TableSummaryMap table_summaries;
55  size_t num_ranges;
56  uint32_t num_empty_servers;
57  } balance_state_t;
58 
59  bool maybe_add_to_plan(const String &table, const String &src_server,
60  const String &start_row, const String &end_row,
61  balance_state_t &state, BalancePlanPtr &plan) {
62  StringSet saturated_tables;
63 
64  // no one can take any more ranges from this table
65  if (saturated_tables.find(table) != saturated_tables.end()) {
66  HT_DEBUG_OUT << "Moves for table " << table << " saturated"
67  << " Dont move " << src_server << ":"<< table << "[" << start_row
68  << ".." << end_row << "] " << HT_END;
69  return false;
70  }
71 
72  // we dont have summary info on this table
73  TableSummaryMap::iterator table_it = state.table_summaries.find(table);
74  if (table_it == state.table_summaries.end()) {
75  HT_DEBUG_OUT << "Summary info for table " << table << " not found."
76  << " Dont move " << src_server << ":"<< table << "[" << start_row
77  << ".." << end_row << "] " << HT_END;
78  return false;
79  }
80 
81  // src server isn't overloaded on ranges from this table
82  if (table_it->second->ranges_per_server == 0)
83  table_it->second->ranges_per_server = table_it->second->total_ranges / state.servers.size() + 1;
84 
85  uint32_t ranges_per_server = table_it->second->ranges_per_server;
86  RangeDistributionMap::iterator src_range_dist_it = table_it->second->range_dist.find(src_server);
87  if (src_range_dist_it == table_it->second->range_dist.end()) {
88  HT_DEBUG_OUT << "No monitoring info for range."
89  << " Dont move " << src_server << ":"<< table << "[" << start_row
90  << ".." << end_row << "] " << HT_END;
91  return false;
92  }
93 
94  if (src_range_dist_it->second <= ranges_per_server) {
95  HT_DEBUG_OUT << "src server has " << src_range_dist_it->second
96  << " ranges from this table which is below average "
97  << ranges_per_server << " Dont move " << src_server << ":" << table
98  << "[" << start_row << ".." << end_row << "] " << HT_END;
99  return false;
100  }
101 
102  // find a destination server that can take this range
103  RangeDistributionMap::iterator dst_range_dist_it;
104  String dst_server;
105 
106  bool found_dst_server = false;
107  for (const auto &target_server : state.servers) {
108  if (!target_server.compare(src_server))
109  continue;
110  dst_range_dist_it = table_it->second->range_dist.find(target_server);
111  if (dst_range_dist_it == table_it->second->range_dist.end()) {
112  pair<RangeDistributionMap::iterator, bool> ret =
113  table_it->second->range_dist.insert(make_pair(target_server, 0));
114  dst_range_dist_it = ret.first;
115  }
116  if (dst_range_dist_it->second < ranges_per_server) {
117  found_dst_server = true;
118  dst_server = target_server;
119  break;
120  }
121  }
122 
123  // none of the empty servers can accomodate any more ranges from this table
124  if (!found_dst_server) {
125  saturated_tables.insert(table_it->first);
126  HT_DEBUG_OUT << "Can't find destination for ranges from table " << table
127  << " (saturated)." << " Dont move " << src_server << ":" << table
128  << "[" << start_row << ".." << end_row << "] " << HT_END;
129  return false;
130  }
131 
132  // update range distribution info and add move to balance plan
133  --(src_range_dist_it->second);
134  ++(dst_range_dist_it->second);
135  RangeMoveSpecPtr move =
136  make_shared<RangeMoveSpec>(src_server.c_str(), dst_server.c_str(),
137  table.c_str(), start_row.c_str(), end_row.c_str());
138  plan->moves.push_back(move);
139 
140  // randomly shuffle the contents of the empty_servers vector to avoid
141  // adjacent ranges from accumulating on the same empty range server
142  random_shuffle(state.servers.begin(), state.servers.end());
143 
144  return true;
145  }
146 
147 }
148 
149 
151  std::vector<RangeServerStatistics> &statistics)
152  : m_context(context), m_statistics(statistics) {
153 }
154 
155 
156 
158  std::vector<RangeServerConnectionPtr> &balanced) {
159  balance_state_t state;
160  StringSet locations;
161  int32_t min_ranges = -1;
162  int32_t max_ranges = -1;
163 
164  state.num_ranges = 0;
165  state.num_empty_servers = 0;
166 
167  for (auto &rs : m_statistics) {
168 
169  if (!rs.stats->live) {
170  HT_INFOF("Aborting balance because %s is not yet live", rs.location.c_str());
171  plan->moves.clear();
172  return;
173  }
174 
175  if (!rs.stats || !m_context->can_accept_ranges(rs))
176  continue;
177 
178  locations.insert(rs.location);
179 
180  if (min_ranges == -1)
181  min_ranges = max_ranges = rs.stats->range_count;
182  else {
183  if (min_ranges > rs.stats->range_count)
184  min_ranges = rs.stats->range_count;
185  if (max_ranges < rs.stats->range_count)
186  max_ranges = rs.stats->range_count;
187  }
188 
189  if (rs.stats->range_count > 0) {
190  TableSummaryPtr ts;
191  state.num_ranges += rs.stats->range_count;
192  for (auto &table : rs.stats->tables) {
193  TableSummaryMap::iterator it = state.table_summaries.find(table.table_id.c_str());
194  if (it == state.table_summaries.end()) {
195  ts = make_shared<TableSummary>();
196  state.table_summaries[table.table_id.c_str()] = ts;
197  }
198  else
199  ts = it->second;
200  ts->range_dist.insert(make_pair(rs.location, table.range_count));
201  ts->total_ranges += table.range_count;
202  }
203  }
204  else
205  state.num_empty_servers++;
206  }
207 
208  if (locations.size() <= 1)
210  "Not enough available servers (%u)", (unsigned)locations.size());
211 
216  std::vector<RangeServerConnectionPtr> connections;
217  m_context->rsc_manager->get_valid_connections(locations, connections);
218  for (auto &rsc : connections) {
219  if (!rsc->get_balanced())
220  balanced.push_back(rsc);
221  state.servers.push_back(rsc->location());
222  }
223 
224  // Only balance if the max variance is at least 3
225  if ((max_ranges - min_ranges) < 3) {
226  HT_INFO("Aborting balance because max variance < 3");
227  return;
228  }
229 
233  {
234  ScanSpec scan_spec;
235  Cell cell;
236  String last_key, last_location, last_start_row;
237  bool read_start_row = false;
238  String location("Location"), start_row("StartRow");
239 
240  scan_spec.columns.push_back(location.c_str());
241  scan_spec.columns.push_back(start_row.c_str());
242  scan_spec.max_versions = 1;
243 
244  TableScannerPtr scanner(m_context->metadata_table->create_scanner(scan_spec));
245  while (scanner->next(cell)) {
246  // don't move root METADATA range
247  if (!strcmp(cell.row_key, Key::END_ROOT_ROW)) {
248  HT_DEBUG_OUT << "Skipping METADATA root range " << cell.row_key << HT_END;
249  continue;
250  }
251 
252  if (last_key == cell.row_key) {
253  if (location == cell.column_family)
254  last_location = String((const char *)cell.value, cell.value_len);
255  else {
256  read_start_row = true;
257  last_start_row = String((const char *)cell.value, cell.value_len);
258  }
259  }
260  else {
261  last_key = cell.row_key;
262  if (location == cell.column_family) {
263  last_start_row.clear();
264  read_start_row = false;
265  last_location = String((const char *)cell.value, cell.value_len);
266  }
267  else {
268  last_location.clear();
269  read_start_row = true;
270  last_start_row = String((const char *)cell.value, cell.value_len);
271  }
272  }
273 
274  HT_DEBUG_OUT << "last_key=" << last_key << ", last_location="
275  << last_location << ", last_start_row=" << last_start_row << HT_END;
276 
277  if (last_location.size() > 0 && read_start_row) {
278  size_t pos = last_key.find(':');
279  HT_ASSERT(pos != string::npos);
280  String table(last_key, 0, pos);
281  String end_row(last_key, pos+1);
282  if (maybe_add_to_plan(table, last_location, last_start_row, end_row, state, plan)) {
283  HT_DEBUG_OUT << "Added move for range " << table << ":" << end_row
284  << ", start_row=" << last_start_row << ", location="
285  << last_location << HT_END;
286  }
287  }
288  }
289  }
290 }
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
std::vector< RangeServerStatistics > m_statistics
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
std::shared_ptr< BalancePlan > BalancePlanPtr
Definition: BalancePlan.h:81
#define HT_INFO(msg)
Definition: Logger.h:271
STL namespace.
BalanceAlgorithmEvenRanges(ContextPtr &context, std::vector< RangeServerStatistics > &statistics)
std::shared_ptr< TableScanner > TableScannerPtr
Smart pointer to TableScanner.
Definition: TableScanner.h:124
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Scan predicate and control specification.
Definition: ScanSpec.h:56
Compatibility Macros for C/C++.
const char * row_key
Definition: Cell.h:66
#define HT_END
Definition: Logger.h:220
Hypertable definitions
const char * column_family
Definition: Cell.h:67
#define HT_INFOF(msg,...)
Definition: Logger.h:272
#define HT_THROWF(_code_, _fmt_,...)
Definition: Error.h:490
static const char * END_ROOT_ROW
Definition: Key.h:50
uint32_t value_len
Definition: Cell.h:72
std::shared_ptr< RangeMoveSpec > RangeMoveSpecPtr
Encapsulates decomposed key and value.
Definition: Cell.h:32
virtual void compute_plan(BalancePlanPtr &plan, std::vector< RangeServerConnectionPtr > &balanced)
#define HT_DEBUG_OUT
Definition: Logger.h:261
const uint8_t * value
Definition: Cell.h:71