0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
BalanceAlgorithmLoad.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "BalanceAlgorithmLoad.h"
25 
27 
28 using namespace Hypertable;
29 using namespace Hypertable::Lib;
30 using namespace Hypertable::Lib::RS_METRICS;
31 using namespace std;
32 
34  std::vector<RangeServerStatistics> &statistics)
35  : m_context(context) {
36 
37  m_loadavg_deviation_threshold = m_context->props->get_f64("Hypertable.LoadBalancer.LoadavgThreshold");
38 
39  for (auto &rs : statistics)
40  m_rsstats[rs.location] = rs;
41 }
42 
43 
45  std::vector<RangeServerConnectionPtr> &balanced) {
46  vector<ServerMetrics> server_metrics;
47  RS_METRICS::ReaderTable rs_metrics(m_context->rs_metrics_table);
48  rs_metrics.get_server_metrics(server_metrics);
49 
51  ServerSetDescLoad servers_desc_load;
52  int num_servers;
53  int num_loaded_servers=0;
54  double mean_loadavg=0;
55  double mean_loadavg_per_loadestimate=0;
56 
57  for (const auto &sm : server_metrics) {
58  // only assign ranges if this RangeServer is connected
60  if (m_context->rsc_manager &&
61  (!m_context->rsc_manager->find_server_by_location(sm.get_id(), rsc)
62  || !rsc->connected() || rsc->get_removed() || rsc->is_recovering())) {
63  HT_INFOF("RangeServer %s not connected, skipping", sm.get_id().c_str());
64  continue;
65  }
66 
68  servers_desc_load.insert(ss);
69  mean_loadavg += ss.loadavg;
70  if (ss.loadavg_per_loadestimate > 0) {
71  mean_loadavg_per_loadestimate += ss.loadavg_per_loadestimate;
72  num_loaded_servers++;
73  }
74  }
75  num_servers = servers_desc_load.size();
76 
77  if (num_servers < 2 || num_loaded_servers < 1) {
78  HT_INFOF("No balancing required, num_servers=%d, num_loaded_servers=%d",
79  num_servers, num_loaded_servers);
80  return;
81  }
82 
83  mean_loadavg /= num_servers;
84  mean_loadavg_per_loadestimate /= num_loaded_servers;
85 
86  HT_INFOF("meand_loadavg=%f, num_servers=%u, mean_loadavg_per_loadestimate=%f"
87  ", num_loaded_servers=%u, loadavg_deviation_threshold=%f",
88  mean_loadavg, (unsigned)num_servers, mean_loadavg_per_loadestimate,
89  (unsigned)num_loaded_servers, m_loadavg_deviation_threshold);
90 
91  while (1) {
92  if (servers_desc_load.size() < 2)
93  break;
94  ServerMetricSummary heaviest = *(servers_desc_load.begin());
95  ServerMetricSummary lightest = *(servers_desc_load.rbegin());
96 
97  if (lightest.loadavg_per_loadestimate == 0)
98  lightest.loadavg_per_loadestimate = mean_loadavg_per_loadestimate;
99  if (heaviest.loadavg_per_loadestimate == 0)
100  heaviest.loadavg_per_loadestimate = mean_loadavg_per_loadestimate;
101 
102  HT_DEBUG_OUT << "heaviest_server=" << heaviest << ", lightest_server="
103  << lightest << HT_END;
104 
105  // the heaviest server doesnt have enough load to justify any more moves,
106  // so we're done
107  if (heaviest.loadavg < m_loadavg_deviation_threshold + mean_loadavg) {
108  HT_DEBUG_OUT << "Heaviest loaded server now has estimated loadavg of "
109  << heaviest.loadavg << " which is within the acceptable threshold ("
110  << m_loadavg_deviation_threshold << ") of the mean_loadavg " << mean_loadavg
111  << HT_END;
112  break;
113  }
114 
115  RangeMetricsMap range_metrics;
116  RangeSetDescLoad ranges_desc_load;
117 
118  rs_metrics.get_range_metrics(heaviest.server_id, range_metrics);
119  populate_range_load_set(range_metrics, ranges_desc_load);
120 
121  RangeSetDescLoad::iterator ranges_desc_load_it = ranges_desc_load.begin();
122 
123  while (heaviest.loadavg > m_loadavg_deviation_threshold + mean_loadavg &&
124  ranges_desc_load_it != ranges_desc_load.end()) {
125  if (check_move(heaviest, lightest, ranges_desc_load_it->loadestimate,
126  mean_loadavg)) {
127  // add move to balance plan
128  RangeMoveSpecPtr move = make_shared<RangeMoveSpec>(heaviest.server_id,
129  lightest.server_id, ranges_desc_load_it->table_id,
130  ranges_desc_load_it->start_row, ranges_desc_load_it->end_row);
131  HT_DEBUG_OUT << "Added move to plan: " << *(move.get()) << HT_END;
132  plan->moves.push_back(move);
133 
134  // recompute loadavgs
135  heaviest.loadavg -=
136  heaviest.loadavg_per_loadestimate * ranges_desc_load_it->loadestimate;
137  heaviest.loadavg = (heaviest.loadavg < 0) ? 0 : heaviest.loadavg;
138  lightest.loadavg +=
139  lightest.loadavg_per_loadestimate * ranges_desc_load_it->loadestimate;
140 
141  // erase old lightest server and reinsert with new loadavg estimate
142  ServerSetDescLoad::iterator it = servers_desc_load.end();
143  --it;
144  servers_desc_load.erase(it);
145  servers_desc_load.insert(lightest);
146  lightest = *(servers_desc_load.rbegin());
147 
148  // no need to erase this range from the heaviest loaded range
149  // since we will skip to next range and delete the heaviest server from
150  // the set of servers used in balancing after all moves are done for it
151  }
152  else {
153  HT_DEBUG_OUT << "Moving range " << *ranges_desc_load_it
154  << " is not viable." << HT_END;
155  }
156  ranges_desc_load_it++;
157  }
158 
159  // erase old heaviest server, since it won't be a source or destination
160  // for balancing anymore
161  servers_desc_load.erase(servers_desc_load.begin());
162  }
163 }
164 
165 void
167  ServerMetricSummary &summary) {
168  summary.server_id = metrics.get_id().c_str();
169  double loadestimate = 0;
170 
171  // calculate average loadavg for server
172  const vector<ServerMeasurement> &measurements = metrics.get_measurements();
173  if (measurements.size() > 0) {
174  for (const auto &measurement : measurements) {
175  summary.loadavg += measurement.loadavg;
176  loadestimate += measurement.bytes_written_rate
177  + measurement.bytes_scanned_rate;
178  }
179  summary.loadavg /= measurements.size();
180  summary.loadavg_per_loadestimate = summary.loadavg
181  / (loadestimate / measurements.size());
182  }
183 
184  StatisticsSet::iterator it = m_rsstats.find(metrics.get_id());
185  if (it != m_rsstats.end())
186  summary.disk_full = !m_context->can_accept_ranges(it->second);
187 }
188 
190  RangeMetricSummary &summary) {
191 
192  bool start_row_set;
193  summary.table_id = metrics.get_table_id().c_str();
194  summary.start_row = metrics.get_start_row(&start_row_set).c_str();
195  summary.end_row = metrics.get_end_row().c_str();
196 
197  // calculate the average loadestimate for this range
198  const vector<RangeMeasurement> &measurements = metrics.get_measurements();
199  if (measurements.size() > 0) {
200  for (const auto &measurement : measurements)
201  summary.loadestimate += measurement.byte_read_rate + measurement.byte_write_rate;
202  summary.loadestimate /= measurements.size();
203  }
204 }
205 
206 
208 
209  ranges_desc_load.clear();
210  for (const auto &vv : range_metrics) {
211  // don't consider ranges that can't be moved
212  if (!vv.second.is_moveable())
213  continue;
214  RangeMetricSummary summary;
215  calculate_range_summary(vv.second, summary);
216  ranges_desc_load.insert(summary);
217  }
218 }
219 
221  const ServerMetricSummary &destination, double range_loadestimate,
222  double mean_loadavg) {
223  // make sure that this move doesn't increase the loadavg of the target more than that of the source
224  double loadavg_destination = destination.loadavg;
225  double delta_destination;
226 
227  delta_destination = destination.loadavg_per_loadestimate * range_loadestimate;
228  loadavg_destination += delta_destination;
229  return (loadavg_destination < m_loadavg_deviation_threshold + mean_loadavg);
230 }
231 
233 ostream &Hypertable::operator<<(ostream &out,
235  out << "{ServerMetricSummary: server_id=" << summary.server_id << ", loadavg="
236  << summary.loadavg << ", loadavg_per_loadestimate=" << summary.loadavg_per_loadestimate
237  << "}";
238  return out;
239 }
240 
242 ostream &Hypertable::operator<<(ostream &out,
244  out << "{RangeMetricSummary: table_id=" << summary.table_id << ", start_row="
245  << summary.start_row << ", end_row=" << summary.end_row
246  << ", loadestimate=" << summary.loadestimate << "}";
247  return out;
248 }
249 
bool check_move(const ServerMetricSummary &source, const ServerMetricSummary &destination, double range_loadestimate, double mean_loadavg)
void calculate_range_summary(const Lib::RS_METRICS::RangeMetrics &metrics, RangeMetricSummary &summary)
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
const std::vector< ServerMeasurement > & get_measurements() const
Definition: ServerMetrics.h:76
const std::vector< RangeMeasurement > & get_measurements() const
Definition: RangeMetrics.h:90
std::shared_ptr< BalancePlan > BalancePlanPtr
Definition: BalancePlan.h:81
std::map< String, RangeMetrics > RangeMetricsMap
Definition: RangeMetrics.h:104
Facilities for reading and writing sys/RS_METRICS table.
STL namespace.
virtual void compute_plan(BalancePlanPtr &plan, std::vector< RangeServerConnectionPtr > &balanced)
const String & get_start_row(bool *isset) const
Definition: RangeMetrics.h:80
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
std::multiset< RangeMetricSummary, GtRangeMetricSummary > RangeSetDescLoad
void calculate_server_summary(const Lib::RS_METRICS::ServerMetrics &metrics, ServerMetricSummary &summary)
std::multiset< ServerMetricSummary, GtServerMetricSummary > ServerSetDescLoad
Declarations for ReaderTable.
Compatibility Macros for C/C++.
std::ostream & operator<<(std::ostream &os, const crontab_entry &entry)
Helper function to write crontab_entry to an ostream.
Definition: Crontab.cc:301
Aggregates metrics for an individual range.
Definition: RangeMetrics.h:66
#define HT_END
Definition: Logger.h:220
BalanceAlgorithmLoad(ContextPtr &context, std::vector< RangeServerStatistics > &statistics)
Hypertable library.
Definition: CellInterval.h:30
Hypertable definitions
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Reads metrics from the sys/RS_METRICS table.
Definition: ReaderTable.h:47
std::shared_ptr< RangeMoveSpec > RangeMoveSpecPtr
Aggregates metrics for an individual RangeServer.
Definition: ServerMetrics.h:68
virtual void get_range_metrics(const char *server_id, RangeMetricsMap &range_metrics)
Definition: ReaderTable.cc:43
const String & get_table_id() const
Definition: RangeMetrics.h:79
virtual void get_server_metrics(vector< ServerMetrics > &server_metrics)
Definition: ReaderTable.cc:83
void populate_range_load_set(const Lib::RS_METRICS::RangeMetricsMap &range_metrics, RangeSetDescLoad &ranges_desc_load)
#define HT_DEBUG_OUT
Definition: Logger.h:261