0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
GroupCommit.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 #include <Common/Compat.h>
22 #include "GroupCommit.h"
23 
27 
28 #include <Common/Config.h>
29 
30 using namespace Hypertable;
31 using namespace Hypertable::Config;
32 using namespace std;
33 
34 GroupCommit::GroupCommit(Apps::RangeServer *range_server) : m_range_server(range_server) {
35 
36  m_commit_interval = get_i32("Hypertable.RangeServer.CommitInterval");
37 
38 }
39 
40 
41 void
42 GroupCommit::add(EventPtr &event, uint64_t cluster_id, SchemaPtr &schema,
43  const TableIdentifier &table, uint32_t count,
44  StaticBuffer &buffer, uint32_t flags) {
45  lock_guard<mutex> lock(m_mutex);
46  UpdateRequest *request = new UpdateRequest();
47  auto expire_time = event->deadline();
48  ClusterTableIdPair key = std::make_pair(cluster_id, table);
49 
50  key.second.id = m_flyweight_strings.get(table.id);
51 
52  request->buffer = buffer;
53  request->count = count;
54  request->event = event;
55 
56  auto iter = m_table_map.find(key);
57  if (iter == m_table_map.end()) {
58 
59  UpdateRecTable *tu = new UpdateRecTable();
60  tu->cluster_id = cluster_id;
61  tu->id = key.second;
62  tu->commit_interval = schema->get_group_commit_interval();
64  tu->total_count = count;
65  tu->total_buffer_size = buffer.size;
66  tu->expire_time = expire_time;
67  tu->requests.push_back(request);
68  m_table_map[key] = tu;
69  return;
70  }
71 
72  if (expire_time > (*iter).second->expire_time)
73  (*iter).second->expire_time = expire_time;
74  (*iter).second->total_count += count;
75  (*iter).second->total_buffer_size += buffer.size;
76  (*iter).second->requests.push_back(request);
77 }
78 
79 
80 
82  lock_guard<mutex> lock(m_mutex);
83  std::vector<UpdateRecTable *> updates;
84  ClockT::time_point expire_time;
85 
86  m_counter++;
87 
88  auto iter = m_table_map.begin();
89  while (iter != m_table_map.end()) {
90  if ((m_counter % (*iter).second->commit_iteration) == 0) {
91  auto remove_iter = iter;
92  if (iter->second->expire_time > expire_time)
93  expire_time = iter->second->expire_time;
94  ++iter;
95  updates.push_back((*remove_iter).second);
96  m_table_map.erase(remove_iter);
97  }
98  else
99  ++iter;
100  }
101 
102  if (!updates.empty())
103  m_range_server->batch_update(updates, expire_time);
104 
105 }
uint32_t m_commit_interval
Cached copy of Hypertable.RangeServer.CommitInterval property.
Definition: GroupCommit.h:80
A memory buffer of static size.
Definition: StaticBuffer.h:45
uint32_t count
Count of serialized key/value pairs in buffer.
Definition: UpdateRequest.h:59
std::vector< UpdateRequest * > requests
Vector of corresponding client requests.
chrono::time_point< fast_clock > time_point
Definition: fast_clock.h:42
Holds updates destined for a specific table.
virtual void trigger()
Processes queued updates that are ready to be committed.
Definition: GroupCommit.cc:81
std::map< ClusterTableIdPair, UpdateRecTable *, lt_ctip > m_table_map
Definition: GroupCommit.h:86
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
STL namespace.
Declarations for RangeServer.
FlyweightString m_flyweight_strings
String cache for holding table IDs
Definition: GroupCommit.h:84
ClockT::time_point expire_time
Request expiration time.
StaticBuffer buffer
Update buffer containing serialized key/value pairs.
Definition: UpdateRequest.h:57
EventPtr event
Event object of originating update requst.
Definition: UpdateRequest.h:61
GroupCommit(Apps::RangeServer *range_server)
Constructor.
Definition: GroupCommit.cc:34
void batch_update(std::vector< UpdateRecTable * > &updates, ClockT::time_point expire_time)
Compatibility Macros for C/C++.
Apps::RangeServer * m_range_server
Pointer to RangeServer.
Definition: GroupCommit.h:77
virtual void add(EventPtr &event, uint64_t cluster_id, SchemaPtr &schema, const TableIdentifier &table, uint32_t count, StaticBuffer &buffer, uint32_t flags)
Adds a batch of updates to the group commit queue.
Definition: GroupCommit.cc:42
uint64_t cluster_id
Cluster from which these updates originated.
Hypertable definitions
TableIdentifier id
Table identifier for destination table.
const char * get(const char *str)
Returns a copy of the string; this string is valid till the FlyweightString set is destructed...
Declarations for UpdateRequest.
Holds client update request and error state.
Definition: UpdateRequest.h:54
std::pair< uint64_t, TableIdentifier > ClusterTableIdPair
Structure for holding cluster ID / table identifier pair.
Definition: GroupCommit.h:39
std::shared_ptr< Schema > SchemaPtr
Smart pointer to Schema.
Definition: Schema.h:465
Configuration settings.
Declarations for UpdateRecTable.
int m_counter
Trigger iteration counter.
Definition: GroupCommit.h:82
std::mutex m_mutex
Mutex to serialize concurrent access
Definition: GroupCommit.h:75