0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
DispatchHandlerOperation.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; version 3 of the
9  * License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
29 #include <Common/Compat.h>
30 
31 #include "Context.h"
33 
34 #include <AsyncComm/Protocol.h>
35 
36 #include <Common/Error.h>
37 #include <Common/Logger.h>
38 
39 using namespace Hypertable;
40 
42  : m_context(context), m_rsclient(Comm::instance()), m_outstanding(0), m_error_count(0) {
43 }
44 
45 
47 
48  m_results.clear();
49  m_error_count = 0;
50  m_locations = locations;
51  m_outstanding = locations.size();
52 
53  for (StringSet::iterator iter = m_locations.begin(); iter != m_locations.end(); ++iter) {
54  try {
55  start(*iter);
56  }
57  catch (Exception &e) {
58  HT_INFOF("%s - %s", Error::get_text(e.code()), e.what());
59  if (e.code() == Error::COMM_NOT_CONNECTED ||
62  lock_guard<mutex> lock(m_mutex);
63  Result result(*iter);
64  m_outstanding--;
65  result.error = e.code();
66  result.msg = "Send error";
67  m_results.insert(result);
68  m_error_count++;
69  }
70  }
71  }
72 }
73 
74 
79  lock_guard<mutex> lock(m_mutex);
80 
81  if (m_events.count(event) > 0) {
82  HT_INFOF("Skipping second event - %s", event->to_str().c_str());
83  return;
84  }
85 
87  m_events.insert(event);
88  m_outstanding--;
89  if (m_outstanding == 0)
90  m_cond.notify_all();
91 }
92 
93 
96 
97  for (const auto &event : m_events) {
98 
99  if (m_context->rsc_manager->find_server_by_local_addr(event->addr, rsc)) {
100  Result result(rsc->location());
101  if (event->type == Event::MESSAGE) {
102  if ((result.error = Protocol::response_code(event)) != Error::OK) {
103  m_error_count++;
104  result.msg = Protocol::string_format_message(event);
105  m_results.insert(result);
106  }
107  }
108  else {
109  m_error_count++;
110  result.error = event->error;
111  result.msg = "";
112  m_results.insert(result);
113  }
114  }
115  else
116  HT_WARNF("Couldn't locate connection object for %s",
117  InetAddr(event->addr).format().c_str());
118 
119  result_callback(event);
120  }
121 }
122 
123 
125  unique_lock<mutex> lock(m_mutex);
126  m_cond.wait(lock, [this](){ return m_outstanding == 0; });
127  process_events();
128  return m_error_count == 0;
129 }
130 
131 
132 void DispatchHandlerOperation::get_results(std::set<Result> &results) {
133  lock_guard<mutex> lock(m_mutex);
134  results = m_results;
135 }
std::set< String > StringSet
STL Set managing Strings.
Definition: StringExt.h:42
std::mutex m_mutex
Mutex for serializing concurrent access
#define HT_WARNF(msg,...)
Definition: Logger.h:290
static int32_t response_code(const Event *event)
Returns the response code from an event event generated in response to a request message.
Definition: Protocol.cc:39
std::shared_ptr< RangeServerConnection > RangeServerConnectionPtr
static String string_format_message(const Event *event)
Returns error message decoded standard error MESSAGE generated in response to a request message...
Definition: Protocol.cc:51
DispatchHandlerOperation(ContextPtr &context)
Constructor.
std::shared_ptr< Event > EventPtr
Smart pointer to Event.
Definition: Event.h:228
void start(StringSet &locations)
Starts asynchronous request.
std::set< Result > m_results
Set of result objects.
std::shared_ptr< Context > ContextPtr
Smart pointer to Context.
Definition: Context.h:265
#define HT_ASSERT(_e_)
Definition: Logger.h:396
Declarations for DispatchHandlerOperation.
void process_events()
Processes m_events set.
const char * get_text(int error)
Returns a descriptive error message.
Definition: Error.cc:330
Encapsulate an internet address.
Definition: InetAddr.h:66
Logging routines and macros.
Compatibility Macros for C/C++.
bool wait_for_completion()
Waits for requests to complete.
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
Hypertable definitions
void get_results(std::set< Result > &results)
Returns the Result set.
Entry point to AsyncComm service.
Definition: Comm.h:61
virtual void result_callback(const EventPtr &event)
Post-request hook method.
std::condition_variable m_cond
Condition variable used to wait for completion.
StringSet m_locations
Set of servers participating in operation.
Declarations for Protocol.
#define HT_INFOF(msg,...)
Definition: Logger.h:272
Request/response message event.
Definition: Event.h:63
This is a generic exception class for Hypertable.
Definition: Error.h:314
std::set< EventPtr, LtEventPtr > m_events
Set of events generated by range server responses.
Error codes, Exception handling, error logging.
Declarations for Context.
int m_outstanding
Outstanding request count that have not completed.
virtual void handle(EventPtr &event)
Process response event.
int code() const
Returns the error code.
Definition: Error.h:391