0.9.8.10
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
SshSocketHandler.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2007-2015 Hypertable, Inc.
3  *
4  * This file is part of Hypertable.
5  *
6  * Hypertable is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version 3
9  * of the License, or any later version.
10  *
11  * Hypertable is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19  * 02110-1301, USA.
20  */
21 
26 
27 #include <Common/Compat.h>
28 
29 #include "SshSocketHandler.h"
30 
31 #include <Common/FileUtils.h>
32 #include <Common/Logger.h>
33 
34 #include <AsyncComm/PollEvent.h>
35 
36 #include <cerrno>
37 #include <chrono>
38 #include <cstdlib>
39 #include <cstring>
40 #include <set>
41 #include <thread>
42 
43 #include <fcntl.h>
44 #include <strings.h>
45 #include <sys/ioctl.h>
46 
47 using namespace Hypertable;
48 using namespace std;
49 
50 #define SSH_READ_PAGE_SIZE 8192
51 
52 #define LOG_PREFIX "[" << m_hostname << "] === SshSocketHandler.cc:" << __LINE__ << " === "
53 
54 namespace {
55 
56  enum {
57  STATE_INITIAL = 0,
58  STATE_CREATE_SESSION = 1,
59  STATE_COMPLETE_CONNECTION = 2,
60  STATE_VERIFY_KNOWNHOST = 3,
61  STATE_AUTHENTICATE = 4,
62  STATE_CONNECTED = 5,
63  STATE_COMPLETE_CHANNEL_SESSION_OPEN = 6,
64  STATE_CHANNEL_REQUEST_EXEC = 7,
65  STATE_CHANNEL_REQUEST_READ = 8
66  };
67 
68  const char *state_str(int state) {
69  switch (state) {
70  case (STATE_INITIAL):
71  return "INITIAL";
72  case (STATE_COMPLETE_CONNECTION):
73  return "COMPLETE_CONNECTION";
74  case (STATE_CREATE_SESSION):
75  return "CREATE_SESSION";
76  case (STATE_VERIFY_KNOWNHOST):
77  return "VERIFY_KNOWNHOST";
78  case (STATE_AUTHENTICATE):
79  return "AUTHENTICATE";
80  case (STATE_CONNECTED):
81  return "CONNECTED";
82  case (STATE_COMPLETE_CHANNEL_SESSION_OPEN):
83  return "STATE_COMPLETE_CHANNEL_SESSION_OPEN";
84  case (STATE_CHANNEL_REQUEST_EXEC):
85  return "STATE_CHANNEL_REQUEST_EXEC";
86  case (STATE_CHANNEL_REQUEST_READ):
87  return "STATE_CHANNEL_REQUEST_READ";
88  default:
89  break;
90  }
91  return "UNKNOWN";
92  }
93 
94  void log_callback_function(ssh_session session, int priority,
95  const char *message, void *userdata) {
96  ((SshSocketHandler *)userdata)->log_callback(session, priority, message);
97  }
98 
99  int auth_callback_function(const char *prompt, char *buf, size_t len,
100  int echo, int verify, void *userdata) {
101  return ((SshSocketHandler *)userdata)->auth_callback(prompt, buf, len, echo, verify);
102  }
103 
104  void connect_status_callback_function(void *userdata, float status) {
105  ((SshSocketHandler *)userdata)->connect_status_callback(status);
106  }
107 
108  void global_request_callback_function(ssh_session session,
109  ssh_message message, void *userdata) {
110  ((SshSocketHandler *)userdata)->global_request_callback(session, message);
111  }
112 
113  void exit_status_callback_function(ssh_session session,
114  ssh_channel channel,
115  int exit_status,
116  void *userdata) {
117  ((SshSocketHandler *)userdata)->set_exit_status(exit_status);
118  }
119 
120 }
121 
123 int SshSocketHandler::ms_libssh_verbosity {SSH_LOG_PROTOCOL};
124 
125 void SshSocketHandler::enable_debug() { ms_debug_enabled = true; }
126 
127 void SshSocketHandler::set_libssh_verbosity(const std::string &value) {
128  if (!strcasecmp(value.c_str(), "none"))
129  ms_libssh_verbosity = SSH_LOG_NOLOG;
130  else if (!strcasecmp(value.c_str(), "warning"))
131  ms_libssh_verbosity = SSH_LOG_WARNING;
132  else if (!strcasecmp(value.c_str(), "protocol"))
133  ms_libssh_verbosity = SSH_LOG_PROTOCOL;
134  else if (!strcasecmp(value.c_str(), "packet"))
135  ms_libssh_verbosity = SSH_LOG_PACKET;
136  else if (!strcasecmp(value.c_str(), "functions"))
137  ms_libssh_verbosity = SSH_LOG_FUNCTIONS;
138  else {
139  cout << "Unrecognized libssh logging level: " << value << endl;
140  quick_exit(EXIT_FAILURE);
141  }
142 }
143 
144 SshSocketHandler::SshSocketHandler(const string &hostname)
145  : m_hostname(hostname), m_log_collector(1024),
146  m_stdout_collector(SSH_READ_PAGE_SIZE), m_stderr_collector(1024) {
147 
148  m_sd = socket(AF_INET, SOCK_STREAM, 0);
149  if (m_sd < 0) {
150  m_error = string("socket(AF_INET, SOCK_STREAM, 0) fialed - ") + strerror(errno);
151  return;
152  }
153 
154  // Set to non-blocking
155  FileUtils::set_flags(m_sd, O_NONBLOCK);
156 
157  struct hostent *server = gethostbyname(m_hostname.c_str());
158  if (server == nullptr) {
159  m_error = string("gethostbyname('") + m_hostname + "') failed - " + hstrerror(h_errno);
160  deregister(m_sd);
161  return;
162  }
163 
164  struct sockaddr_in serv_addr;
165  bzero((char *) &serv_addr, sizeof(serv_addr));
166  serv_addr.sin_family = AF_INET;
167  bcopy((char *)server->h_addr,
168  (char *)&serv_addr.sin_addr.s_addr,
169  server->h_length);
170  serv_addr.sin_port = htons(22);
171 
172  m_comm_address = CommAddress(serv_addr);
173 
175 
177 
178  m_state = STATE_CREATE_SESSION;
179 
180  while (connect(m_sd, (struct sockaddr *)&serv_addr,sizeof(serv_addr)) < 0) {
181  if (errno == EINTR) {
182  this_thread::sleep_for(chrono::milliseconds(1000));
183  continue;
184  }
185  else if (errno != EINPROGRESS) {
186  m_error = string("connect(") + InetAddr::format(serv_addr) + ") failed - " + strerror(errno);
187  deregister(m_sd);
188  return;
189  }
190  m_state = STATE_INITIAL;
191  break;
192  }
193 
194  int rc = m_comm->register_socket(m_sd, m_comm_address, this);
195  if (rc != Error::OK) {
196  m_error = string("Comm::register_socket(") + InetAddr::format(serv_addr) + ") failed - " + strerror(errno);
197  deregister(m_sd);
198  }
199 }
200 
201 
203  if (m_state != STATE_INITIAL && m_ssh_session) {
204  ssh_disconnect(m_ssh_session);
205  ssh_free(m_ssh_session);
206  }
207 }
208 
209 bool SshSocketHandler::handle(int sd, int events) {
210  lock_guard<mutex> lock(m_mutex);
211  int rc;
212 
213  if (ms_debug_enabled)
214  cerr << LOG_PREFIX << "Entering handler (events="
215  << PollEvent::to_string(events) << ", state=" << state_str(m_state)
216  << ")\n";
217 
218  while (true) {
219  switch (m_state) {
220 
221  case (STATE_INITIAL):
222  {
223  int sockerr = 0;
224  socklen_t sockerr_len = sizeof(sockerr);
225  if (getsockopt(m_sd, SOL_SOCKET, SO_ERROR, &sockerr, &sockerr_len) < 0) {
226  m_error = string("getsockopt(SO_ERROR) failed (") + strerror(errno) + ")";
227  m_cond.notify_all();
228  return false;
229  }
230  if (sockerr) {
231  m_error = string("connect() completion error (") + strerror(errno) + ")";
232  m_cond.notify_all();
233  return false;
234  }
235  }
237  m_state = STATE_CREATE_SESSION;
238 
239  case (STATE_CREATE_SESSION):
240  {
241  m_ssh_session = ssh_new();
242 
244 
245  char *home = getenv("HOME");
246  if (home == nullptr)
247  HT_FATAL("Environment variable HOME is not set");
248  string ssh_dir(home);
249  ssh_dir.append("/.ssh");
250  ssh_options_set(m_ssh_session, SSH_OPTIONS_SSH_DIR, ssh_dir.c_str());
251 
252  int verbosity = ms_libssh_verbosity;
253  ssh_options_set(m_ssh_session, SSH_OPTIONS_LOG_VERBOSITY, &verbosity);
254 
255  ssh_options_set(m_ssh_session, SSH_OPTIONS_HOST, m_hostname.c_str());
256 
257  ssh_options_set(m_ssh_session, SSH_OPTIONS_FD, &m_sd);
258 
259  ssh_set_blocking(m_ssh_session, 0);
260 
261  // set callbacks
262  memset(&m_callbacks, 0, sizeof(m_callbacks));
263  m_callbacks.userdata = this;
264  m_callbacks.auth_function = auth_callback_function;
265  m_callbacks.log_function = log_callback_function;
266  m_callbacks.connect_status_function = connect_status_callback_function;
267  m_callbacks.global_request_function = global_request_callback_function;
268  ssh_callbacks_init(&m_callbacks);
269 
270  rc = ssh_set_callbacks(m_ssh_session, &m_callbacks);
271  if (rc == SSH_ERROR) {
272  m_error = string("ssh_set_callbacks() failed - ") + ssh_get_error(m_ssh_session);
273  m_cond.notify_all();
274  return false;
275  }
276 
277  // First load system config
278  if (FileUtils::exists("/etc/ssh/ssh_config"))
279  ssh_options_parse_config(m_ssh_session, "/etc/ssh/ssh_config");
280 
281  // Then load ~/.ssh/config
282  string config_file = ssh_dir + "/config";
283  if (FileUtils::exists(config_file))
284  ssh_options_parse_config(m_ssh_session, config_file.c_str());
285 
286  rc = ssh_connect(m_ssh_session);
287  if (rc == SSH_OK) {
288  m_state = STATE_VERIFY_KNOWNHOST;
289  continue;
290  }
291  if (rc == SSH_ERROR) {
292  m_error = string("ssh_connect() failed - ") + ssh_get_error(m_ssh_session);
293  m_cond.notify_all();
294  return false;
295  }
296  else if (rc == SSH_AGAIN) {
297  m_state = STATE_COMPLETE_CONNECTION;
298  break;
299  }
300  }
301 
302  case (STATE_COMPLETE_CONNECTION):
303  rc = ssh_connect(m_ssh_session);
304  if (rc == SSH_AGAIN)
305  break;
306  else if (rc == SSH_ERROR) {
307  m_error = string("ssh_connect() failed - ") + ssh_get_error(m_ssh_session);
308  m_cond.notify_all();
309  return false;
310  }
311  HT_ASSERT(rc == SSH_OK);
312  m_state = STATE_VERIFY_KNOWNHOST;
313 
314  case (STATE_VERIFY_KNOWNHOST):
315  if (!verify_knownhost()) {
316  m_cond.notify_all();
317  return false;
318  }
319  m_state = STATE_AUTHENTICATE;
320 
321  case (STATE_AUTHENTICATE):
322  rc = ssh_userauth_publickey_auto(m_ssh_session, nullptr, nullptr);
323  if (rc == SSH_AUTH_ERROR) {
324  m_error = string("authentication failure - ") + ssh_get_error(m_ssh_session);
325  m_cond.notify_all();
326  return false;
327  }
328  else if (rc == SSH_AUTH_AGAIN) {
330  if (socket_has_data())
331  continue;
332  break;
333  }
334  else if (rc == SSH_AUTH_DENIED) {
335  m_error = string("publickey authentication denied");
336  m_cond.notify_all();
337  return false;
338  }
339 
340  HT_ASSERT(rc == SSH_AUTH_SUCCESS);
341  m_state = STATE_CONNECTED;
343  m_cond.notify_all();
344  break;
345 
346  case (STATE_COMPLETE_CHANNEL_SESSION_OPEN):
347  rc = ssh_channel_open_session(m_channel);
348  if (rc == SSH_AGAIN) {
349  if (socket_has_data())
350  continue;
351  break;
352  }
353  else if (rc == SSH_ERROR) {
354  ssh_channel_free(m_channel);
355  m_channel = 0;
356  m_error = string("ssh_channel_open_session() failed - ") + ssh_get_error(m_ssh_session);
357  m_cond.notify_all();
358  return false;
359  }
360  HT_ASSERT(rc == SSH_OK);
361  m_state = STATE_CHANNEL_REQUEST_EXEC;
362 
363  case (STATE_CHANNEL_REQUEST_EXEC):
364  rc = ssh_channel_request_exec(m_channel, m_command.c_str());
366  if (rc == SSH_AGAIN) {
367  if (socket_has_data())
368  continue;
369  break;
370  }
371  else if (rc == SSH_ERROR) {
372  m_error = string("ssh_request_exec() failed - ") + ssh_get_error(m_ssh_session);
373  ssh_channel_close(m_channel);
374  ssh_channel_free(m_channel);
375  m_channel = 0;
376  m_cond.notify_all();
377  return false;
378  }
379  HT_ASSERT(rc == SSH_OK);
380  m_state = STATE_CHANNEL_REQUEST_READ;
381 
382  case (STATE_CHANNEL_REQUEST_READ):
383 
384  while (true) {
385 
386  if (m_stdout_buffer.base == 0)
388 
389  int nbytes = ssh_channel_read(m_channel,
392  0);
393 
394  if (nbytes == SSH_ERROR) {
395  m_error = string("ssh_channel_read() failed - ") + ssh_get_error(m_ssh_session);
396  ssh_channel_close(m_channel);
397  ssh_channel_free(m_channel);
398  m_channel = 0;
399  m_cond.notify_all();
400  return false;
401  }
402  else if (nbytes == SSH_EOF) {
403  m_channel_is_eof = true;
404  break;
405  }
406  else if (nbytes <= 0)
407  break;
408 
409  if (nbytes > 0 && *m_stdout_buffer.ptr == 0)
410  continue;
411 
412  if (m_terminal_output)
413  write_to_stdout((const char *)m_stdout_buffer.ptr, nbytes);
414 
415  m_stdout_buffer.ptr += (size_t)nbytes;
419  }
420  }
421 
422  while (true) {
423 
424  if (m_stderr_buffer.base == 0)
426 
427  int nbytes = ssh_channel_read(m_channel,
430  1);
431 
432  if (nbytes == SSH_ERROR) {
433  m_error = string("ssh_channel_read() failed - ") + ssh_get_error(m_ssh_session);
434  ssh_channel_close(m_channel);
435  ssh_channel_free(m_channel);
436  m_channel = 0;
437  m_cond.notify_all();
438  return false;
439  }
440  else if (nbytes == SSH_EOF) {
441  m_channel_is_eof = true;
442  break;
443  }
444  else if (nbytes <= 0)
445  break;
446 
447  if (nbytes > 0 && *m_stderr_buffer.ptr == 0)
448  continue;
449 
450  if (m_terminal_output)
451  write_to_stderr((const char *)m_stderr_buffer.ptr, nbytes);
452 
453  m_stderr_buffer.ptr += (size_t)nbytes;
457  }
458  }
459 
460  if (m_channel_is_eof || ssh_channel_is_eof(m_channel)) {
461  int exit_status = ssh_channel_get_exit_status(m_channel);
462  // If ssh_channel_get_exit_status() returns -1 and the exit status has not yet
464  if (ms_debug_enabled)
465  cerr << LOG_PREFIX << "At EOF (exit_status=" << exit_status
466  << ", status_is_set="
467  << (m_command_exit_status_is_set ? "true" : "false") << ")\n";
468  if (exit_status == -1 && !m_command_exit_status_is_set)
469  break;
473  }
478  ssh_channel_close(m_channel);
479  ssh_channel_free(m_channel);
480  m_channel = 0;
481  m_state = STATE_CONNECTED;
482  m_poll_interest = 0;
483  m_cond.notify_all();
484  }
485  break;
486 
487  case (STATE_CONNECTED):
488  break;
489 
490  default:
491  HT_FATALF("Unrecognize state - %d", m_state);
492  }
493  break;
494  }
495 
496  if (ms_debug_enabled)
497  cerr << LOG_PREFIX << "Leaving handler (poll_interest="
499  << ", state=" << state_str(m_state) << ")\n";
500 
501  return true;
502 }
503 
505  ::close(m_sd);
506  m_sd = -1;
508 }
509 
510 void SshSocketHandler::log_callback(ssh_session session, int priority, const char *message) {
511  size_t len;
512  if (ms_debug_enabled) {
513  cerr << "[" << m_hostname << "] " << message << "\n";
514  return;
515  }
516  if (ms_libssh_verbosity <= SSH_LOG_PROTOCOL && priority <= 1)
517  return;
518  if (m_log_buffer.base == 0)
520  while (m_log_buffer.remain() < strlen(message)) {
521  len = m_log_buffer.remain();
522  m_log_buffer.add(message, len);
523  message += len;
526  }
527  if (*message) {
528  len = strlen(message);
529  m_log_buffer.add(message, len);
530  }
531  // Add newline
532  if (m_log_buffer.remain() == 0) {
535  }
536  m_log_buffer.add("\n", 1);
537 }
538 
539 int SshSocketHandler::auth_callback(const char *prompt, char *buf, size_t len, int echo, int verify) {
540  if (ms_debug_enabled)
541  cerr << LOG_PREFIX << "auth_callback (" << prompt << ", buflen=" << len
542  << ", echo=" << echo << ", verify=" << verify << ")\n";
543  return -1;
544 }
545 
547  if (ms_debug_enabled)
548  cerr << LOG_PREFIX << "connect_status_callback " << (int)(status*100.0) << "%\n";
549 }
550 
551 void SshSocketHandler::global_request_callback(ssh_session session, ssh_message message) {
552  if (ms_debug_enabled)
553  cerr << LOG_PREFIX << "global_request_callback (type=" << ssh_message_type(message)
554  << ", subtype=" << ssh_message_subtype(message) << ")\n";
555 }
556 
560  m_channel_is_eof = true;
561 }
562 
563 bool SshSocketHandler::wait_for_connection(chrono::system_clock::time_point deadline) {
564  unique_lock<mutex> lock(m_mutex);
565  while (m_state != STATE_CONNECTED && m_error.empty() && !m_cancelled) {
566  if (m_cond.wait_until(lock, deadline) == cv_status::timeout) {
567  m_error = "timeout";
568  return false;
569  }
570  }
571  return m_error.empty();
572 }
573 
574 bool SshSocketHandler::issue_command(const std::string &command) {
575  lock_guard<mutex> lock(m_mutex);
576 
577  m_command = command;
580 
581  m_channel = ssh_channel_new(m_ssh_session);
582  m_channel_is_eof = false;
584 
585  ssh_channel_set_blocking(m_channel, 0);
586 
587  // set callbacks
588  memset(&m_channel_callbacks, 0, sizeof(m_channel_callbacks));
589  m_channel_callbacks.userdata = this;
590  m_channel_callbacks.channel_exit_status_function = exit_status_callback_function;
591  ssh_callbacks_init(&m_channel_callbacks);
592  int rc = ssh_set_channel_callbacks(m_channel, &m_channel_callbacks);
593  if (rc == SSH_ERROR) {
594  m_error = string("ssh_set_channel_callbacks() failed - ") + ssh_get_error(m_ssh_session);
595  return false;
596  }
597 
598  while (true) {
599  rc = ssh_channel_open_session(m_channel);
600  if (rc == SSH_AGAIN) {
601  m_state = STATE_COMPLETE_CHANNEL_SESSION_OPEN;
603  if (socket_has_data())
604  continue;
605  return true;
606  }
607  else if (rc == SSH_ERROR) {
608  ssh_channel_free(m_channel);
609  m_channel = 0;
610  m_error = string("ssh_channel_open_session() failed - ") + ssh_get_error(m_ssh_session);
611  m_state = STATE_CONNECTED;
612  return false;
613  }
614  break;
615  }
616 
617  HT_ASSERT(rc == SSH_OK);
618  m_state = STATE_CHANNEL_REQUEST_EXEC;
620  return true;
621 }
622 
624  unique_lock<mutex> lock(m_mutex);
625  while (m_state != STATE_CONNECTED && m_error.empty() &&
627  m_cond.wait(lock);
628  return m_error.empty() && m_command_exit_status == 0 &&
629  (!m_cancelled || m_state == STATE_CONNECTED);
630 }
631 
633  unique_lock<mutex> lock(m_mutex);
634  m_cancelled = true;
635  if (m_channel) {
636  ssh_channel_close(m_channel);
637  ssh_channel_free(m_channel);
638  m_channel = 0;
639  }
640  if (m_state != STATE_INITIAL && m_ssh_session) {
641  ssh_disconnect(m_ssh_session);
642  ssh_free(m_ssh_session);
643  }
644  m_cond.notify_all();
645 }
646 
647 
649  unique_lock<mutex> lock(m_mutex);
650 
651  m_terminal_output = val;
652  if (!m_terminal_output)
653  return;
654 
655  // Send stdout collected so far to output stream
656  bool first = true;
657  if (m_stdout_buffer.fill()) {
660  }
662  if (!m_stdout_collector.empty()) {
663  for (auto & line : m_stdout_collector) {
664  if (first)
665  first = false;
666  else
667  cout << "\n";
668  cout << "[" << m_hostname << "] " << line;
669  }
670  if (!m_stdout_collector.last_line_is_partial())
671  cout << "\n";
672  else
674  cout << flush;
675  }
676 
677  // Send stderr collected so far to output stream
678  first = true;
679  if (m_stderr_buffer.fill()) {
682  }
684  if (!m_stderr_collector.empty()) {
685  for (auto & line : m_stderr_collector) {
686  if (first)
687  first = false;
688  else
689  cerr << "\n";
690  cerr << "[" << m_hostname << "] " << line;
691  }
692  if (!m_stderr_collector.last_line_is_partial())
693  cerr << "\n";
694  else
696  cerr << flush;
697  }
698 }
699 
700 void SshSocketHandler::dump_log(std::ostream &out) {
701  if (m_log_buffer.fill()) {
704  }
705  if (!m_error.empty()) {
706  for (auto & line : m_log_collector)
707  out << "[" << m_hostname << "] " << line << "\n";
708  out << "[" << m_hostname << "] ERROR " << m_error << endl;
709  }
714  out << flush;
715 }
716 
717 
719  unsigned char *hash {};
720  size_t hlen {};
721  int rc;
722 
723  int state = ssh_is_server_known(m_ssh_session);
724 
725  ssh_key key;
726  rc = ssh_get_publickey(m_ssh_session, &key);
727  if (rc != SSH_OK) {
728  m_error = string("unable to obtain public key - ") + ssh_get_error(m_ssh_session);
729  return false;
730  }
731 
732  rc = ssh_get_publickey_hash(key, SSH_PUBLICKEY_HASH_SHA1, &hash, &hlen);
733  if (rc != SSH_OK) {
734  ssh_key_free(key);
735  m_error = "problem computing public key hash";
736  return false;
737  }
738 
739  ssh_key_free(key);
740 
741  switch (state) {
742 
743  case SSH_SERVER_KNOWN_OK:
744  break;
745 
746  case SSH_SERVER_KNOWN_CHANGED:
747  m_error = "host key has changed";
748  free(hash);
749  return false;
750 
751  case SSH_SERVER_FOUND_OTHER:
752  m_error = "Key mis-match with one in known_hosts";
753  free(hash);
754  return false;
755 
756  case SSH_SERVER_FILE_NOT_FOUND:
757  case SSH_SERVER_NOT_KNOWN:
758 
759  if (ssh_write_knownhost(m_ssh_session) < 0) {
760  m_error = "problem writing known hosts file";
761  free(hash);
762  return false;
763  }
764  break;
765 
766  case SSH_SERVER_ERROR:
767  m_error = ssh_get_error(m_ssh_session);
768  return false;
769  }
770  free(hash);
771  return true;
772 }
773 
774 
775 void SshSocketHandler::write_to_stdout(const char *output, size_t len) {
776  const char *base = output;
777  const char *end = output + len;
778  const char *ptr;
779 
780  while (base < end) {
782  cout << "[" << m_hostname << "] ";
783 
784  for (ptr = base; ptr<end; ptr++) {
785  if (*ptr == '\n')
786  break;
787  }
788 
789  if (ptr < end) {
790  cout << string(base, ptr-base) << endl;
791  base = ptr+1;
793  }
794  else {
795  cout << string(base, ptr-base);
797  break;
798  }
799  }
800  cout << flush;
801 }
802 
803 
804 void SshSocketHandler::write_to_stderr(const char *output, size_t len) {
805  const char *base = output;
806  const char *end = output + len;
807  const char *ptr;
808 
809  while (base < end) {
811  cerr << "[" << m_hostname << "] ";
812 
813  for (ptr = base; ptr<end; ptr++) {
814  if (*ptr == '\n')
815  break;
816  }
817 
818  if (ptr < end) {
819  cerr << string(base, ptr-base) << endl;
820  base = ptr+1;
822  }
823  else {
824  cerr << string(base, ptr-base);
826  break;
827  }
828  }
829  cerr << flush;
830 }
831 
832 
834  int count;
835  ioctl(m_sd, FIONREAD, &count);
836  return count != 0;
837 }
static Comm * instance()
Creates/returns singleton instance of the Comm class.
Definition: Comm.h:72
void set_exit_status(int exit_status)
libssh exit status callback This function sets m_command_exit_status to exit_status, sets m_command_exit_status_is_set to true, and then signals m_cond.
void connect_status_callback(float status)
libssh connection status callback.
int register_socket(int sd, const CommAddress &addr, RawSocketHandler *handler)
Registers an externally managed socket with comm event loop.
Definition: Comm.cc:109
std::condition_variable m_cond
Condition variable signalling connection and command completion.
virtual ~SshSocketHandler()
Destructor.
void write_to_stderr(const char *output, size_t len)
Writes output to stderr Writes output to stderr, prefixing each line with '[' hostname ']'...
SshOutputCollector::Buffer m_stderr_buffer
Current stderr output buffer.
std::string m_command
Current command being issued.
SshOutputCollector m_stderr_collector
Output collector for stderr.
bool m_line_prefix_needed_stderr
Line prefix needs to be emitted on next stderr output.
int m_sd
Socket descriptor.
bool issue_command(const std::string &command)
Asynchronously issues a command.
SshOutputCollector m_stdout_collector
Output collector for stdout.
SshOutputCollector m_log_collector
Output collector for logging.
size_t remain() const
Returns amount of unused space remaining in buffer.
static bool exists(const String &fname)
Checks if a file or directory exists.
Definition: FileUtils.cc:420
SshOutputCollector::Buffer m_stdout_buffer
Current stdout output buffer.
int auth_callback(const char *prompt, char *buf, size_t len, int echo, int verify)
libssh authorization callback.
STL namespace.
#define HT_FATAL(msg)
Definition: Logger.h:339
std::mutex m_mutex
Mutex for serialzing access to members
int m_state
Current handler state.
SshSocketHandler(const std::string &hostname)
Constructor.
std::string m_error
Error message
Data available to read.
Definition: PollEvent.h:42
bool m_line_prefix_needed_stdout
Line prefix needs to be emitted on next stdout output.
#define HT_ASSERT(_e_)
Definition: Logger.h:396
bool wait_for_connection(std::chrono::system_clock::time_point deadline)
Waits for connection establishment.
File system utility functions.
static int ms_libssh_verbosity
Libssh logging verbosity level.
#define LOG_PREFIX
std::string to_string(int events)
Returns a string representation of polling events.
Definition: PollEvent.cc:36
#define SSH_READ_PAGE_SIZE
virtual bool handle(int sd, int events)
Socket event handler function.
bool status(ContextPtr &context, Timer &timer, Status &status)
Runs a status check on the master.
Definition: Utility.cc:408
void add(Buffer buf)
Adds filled buffer to collector.
Fixed-size buffer to hold a portion of output.
bool m_channel_is_eof
Flag indicating that current channel is EOF.
Comm * m_comm
Pointer to comm layer.
Logging routines and macros.
virtual void deregister(int sd)
Deregisters socket.
Compatibility Macros for C/C++.
bool verify_knownhost()
Verifies host with public key method.
static void set_libssh_verbosity(const std::string &value)
Sets libssh logging verbosity level.
ssh_channel m_channel
libssh channel object
char * base
Pointer to beginning of buffer memory.
Declarations for PollEvent.
std::string m_hostname
Name of host to connect to.
ssh_callbacks_struct m_callbacks
libssh callbacks
String format(int sep= ':') const
Returns a string with a dotted notation ("127.0.0.1:8080") including the port.
Definition: InetAddr.h:132
static bool set_flags(int fd, int flags)
Sets fcntl flags of a socket.
Definition: FileUtils.cc:256
int m_poll_interest
Current polling interest.
Hypertable definitions
bool m_command_exit_status_is_set
Flag indicating that the exit status has been set.
#define HT_FATALF(msg,...)
Definition: Logger.h:343
Writing can be performed without blocking.
Definition: PollEvent.h:46
void cancel()
Cancels outstanding connection establishment or command execution.
bool m_terminal_output
Redirect output to terminal.
void add(const char *data, size_t len)
Adds data to buffer.
static void enable_debug()
Enables debug logging output.
char * ptr
Pointer to next unused position in buffer memory.
bool m_cancelled
Flag indicating that outstanding operations should be cancelled.
SshOutputCollector::Buffer m_log_buffer
Current logging output buffer.
Raw socket handler for ssh protocol driver.
void log_callback(ssh_session session, int priority, const char *message)
Writes log messages to logging output collector.
bool empty() const
Returns true if no output has been collected Returns true if there are no collected buffers or if non...
CommAddress m_comm_address
Address of connection.
int m_command_exit_status
Command exit status.
size_t fill() const
Returns amount of buffer filled.
void global_request_callback(ssh_session session, ssh_message message)
libssh global request callback.
void write_to_stdout(const char *output, size_t len)
Writes output to stdout Writes output to stdout, prefixing each line with '[' hostname ']'...
Buffer allocate_buffer()
Allocate a buffer.
bool wait_for_command_completion()
Waits for command completion.
void dump_log(std::ostream &out)
Writes collected log messages to output stream.
Declarations for SshSocketHandler.
ssh_session m_ssh_session
libssh sesison object
Address abstraction to hold either proxy name or IPv4:port address.
Definition: CommAddress.h:52
static bool ms_debug_enabled
Flag for enabling debugging output.
bool socket_has_data()
Determines if data available on socket for reading Checks socket descriptor m_sd to see if there is a...
ssh_channel_callbacks_struct m_channel_callbacks
libssh channel callbacks
void set_terminal_output(bool val)
Tells handler to send collected output subsequent output to terminal If val is true, sends any collected stdout or stderr output to terminal and sets m_terminal_output to true which causes any subsequent output collected to be sent to the terminal.