0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
ReplayBuffer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
22 #include <Common/Compat.h>
23 
24 #include "ReplayBuffer.h"
25 #include "ReplayDispatchHandler.h"
26 
27 using namespace std;
28 using namespace Hypertable;
29 using namespace Hypertable::Lib;
30 using namespace Hypertable::Property;
31 
32 ReplayBuffer::ReplayBuffer(PropertiesPtr &props, Comm *comm,
34  const String &location,
35  int32_t plan_generation)
36  : m_comm(comm), m_plan(plan), m_location(location),
37  m_plan_generation(plan_generation) {
39  (size_t)props->get_i64("Hypertable.RangeServer.Failover.FlushLimit.Aggregate");
41  (size_t)props->get_i32("Hypertable.RangeServer.Failover.FlushLimit.PerRange");
42  m_timeout_ms = props->get_i32("Hypertable.Failover.Timeout");
43 
44  StringSet locations;
45  m_plan.get_locations(locations);
46  for (const auto &location : locations) {
47  vector<QualifiedRangeSpec> specs;
48  m_plan.get_range_specs(location, specs);
49  for (auto &spec : specs) {
50  RangeReplayBufferPtr replay_buffer
51  = make_shared<RangeReplayBuffer>(location, spec);
52  m_buffer_map[spec] = replay_buffer;
53  }
54  }
55 }
56 
58  ByteString &value) {
59  const char *row = key.row();
60  QualifiedRangeSpec range;
61  // skip over any cells that are not in the recovery plan
62  if (m_plan.get_range_spec(table, row, range)) {
63  ReplayBufferMap::iterator it = m_buffer_map.find(range);
64  if (it == m_buffer_map.end())
65  return;
66  m_memory_used += it->second->add(key, value);
68  it->second->memory_used() > m_flush_limit_per_range) {
69 #if 0
70  HT_DEBUG_OUT << "flushing replay buffer for fragment " << m_fragment
71  << ", total mem=" << m_memory_used << " range mem used="
72  << it->second->memory_used() << ", total limit="
73  << m_flush_limit_aggregate << ", per range limit="
74  << m_flush_limit_per_range << " key=" << row << HT_END;
75 #endif
76  flush();
77  }
78  }
79  else {
80  HT_DEBUG_OUT << "Skipping key " << row << " for table " << table.id
81  << " because it is not in recovery plan" << HT_END;
82  }
83 }
84 
87 
88  for (auto &vv : m_buffer_map) {
89 
90  if (vv.second->memory_used() > 0) {
91  RangeReplayBuffer &buffer = *(vv.second.get());
92  CommAddress &addr = buffer.get_comm_address();
93  QualifiedRangeSpec &range = buffer.get_range();
94  StaticBuffer updates;
95  buffer.get_updates(updates);
96  handler.add(addr, range, m_fragment, updates);
97  buffer.clear();
98  }
99  }
100 
101  handler.wait_for_completion();
102 
103  m_memory_used=0;
104 }
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
A memory buffer of static size.
Definition: StaticBuffer.h:45
std::string String
A String is simply a typedef to std::string.
Definition: String.h:44
const char * row() const
Definition: SerializedKey.h:53
std::shared_ptr< RangeReplayBuffer > RangeReplayBufferPtr
void get_range_specs(vector< QualifiedRangeSpec > &specs) const
Definition: ReceiverPlan.cc:80
STL namespace.
void add(const CommAddress &addr, const QualifiedRangeSpec &range, uint32_t fragment, StaticBuffer &buffer)
A class managing one or more serializable ByteStrings.
Definition: ByteString.h:47
std::shared_ptr< Properties > PropertiesPtr
Definition: Properties.h:447
Compatibility Macros for C/C++.
const RangeServerRecovery::ReceiverPlan & m_plan
Definition: ReplayBuffer.h:62
ReplayBufferMap m_buffer_map
Definition: ReplayBuffer.h:64
#define HT_END
Definition: Logger.h:220
void add(const TableIdentifier &table, SerializedKey &key, ByteString &value)
Definition: ReplayBuffer.cc:57
Hypertable library.
Definition: CellInterval.h:30
Hypertable definitions
Entry point to AsyncComm service.
Definition: Comm.h:61
Qualified (with table identifier) range specification.
void get_updates(StaticBuffer &updates)
void get_locations(StringSet &locations) const
Definition: ReceiverPlan.cc:58
QualifiedRangeSpec & get_range()
bool get_range_spec(const TableIdentifier &table, const char *row, QualifiedRangeSpec &spec) const
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
#define HT_DEBUG_OUT
Definition: Logger.h:261
RangeServer recovery receiver plan.
Definition: ReceiverPlan.h:48