Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions ducktape/cluster/remoteaccount.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import os
from paramiko import SSHClient, SSHConfig, MissingHostKeyPolicy
from paramiko.ssh_exception import SSHException, NoValidConnectionsError
from dataclasses import dataclass
from collections import deque
from typing import Iterable, Iterator
import shutil
import signal
import socket
Expand All @@ -29,6 +32,9 @@
from ducktape.errors import DucktapeError


NUM_RING_LINES = 33


def check_ssh(method):
def wrapper(self, *args, **kwargs):
try:
Expand Down Expand Up @@ -113,21 +119,28 @@ def __str__(self):
return "%s: %s" % (self.account_str, self.msg)


@dataclass
class RemoteCommandError(RemoteAccountError):
"""This exception is raised when a process run by ssh*() returns a non-zero exit status.
"""

def __init__(self, account, cmd, exit_status, msg):
self.account_str = str(account)
self.exit_status = exit_status
self.cmd = cmd
self.msg = msg
account: "RemoteAccount"
exit_status: int
cmd: str
msg: str = ""
stdout: Iterable[str] = []
stderr: Iterable[str] = []

def __str__(self):
msg = "%s: Command '%s' returned non-zero exit status %d." % (self.account_str, self.cmd, self.exit_status)
lines = [f"{self.account}: Command {self.cmd} returned non-zero exit status {self.exit_status}.{self.msg}"]
if self.msg:
msg += " Remote error message: %s" % self.msg
return msg
lines.append(f"\tMessage: {self.msg}")
if self.stdout:
lines.append(f"\tStdout:")
lines.extend(f"\t\t{line}" for line in self.stdout)
if self.stderr:
lines.append(f"\tStderr:")
lines.extend(f"\t\t{line}" for line in self.stderr)
return "\n".join(lines)


class RemoteAccount(HttpMixin):
Expand Down Expand Up @@ -302,12 +315,12 @@ def ssh(self, cmd, allow_fail=False):

# Unfortunately we need to read over the channel to ensure that recv_exit_status won't hang. See:
# http://docs.paramiko.org/en/2.0/api/channel.html#paramiko.channel.Channel.recv_exit_status
stdout.read()
stdout_lines = stdout.readlines()
exit_status = stdout.channel.recv_exit_status()
try:
if exit_status != 0:
if not allow_fail:
raise RemoteCommandError(self, cmd, exit_status, stderr.read())
raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_lines, stderr=stderr.readlines())
else:
self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" %
(cmd, exit_status, stderr.read()))
Expand Down Expand Up @@ -349,10 +362,11 @@ def ssh_capture(self, cmd, allow_fail=False, callback=None, combine_stderr=True,
stdout = chan.makefile('r', -1)
stderr = chan.makefile_stderr('r', -1)

stdout_buff = deque(maxlen=NUM_RING_LINES)
def output_generator():

for line in iter(stdout.readline, ''):

stdout_buff.append(line)
if callback is None:
yield line
else:
Expand All @@ -361,7 +375,7 @@ def output_generator():
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
if not allow_fail:
raise RemoteCommandError(self, cmd, exit_status, stderr.read())
raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_buff, stderr=stderr.readlines())
else:
self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" %
(cmd, exit_status, stderr.read()))
Expand Down Expand Up @@ -404,7 +418,7 @@ def ssh_output(self, cmd, allow_fail=False, combine_stderr=True, timeout_sec=Non
exit_status = stdin.channel.recv_exit_status()
if exit_status != 0:
if not allow_fail:
raise RemoteCommandError(self, cmd, exit_status, stderr.read())
raise RemoteCommandError(self, cmd, exit_status, stdout=stdoutdata.split("\n"), stderr=stderr.readlines())
else:
self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" %
(cmd, exit_status, stderr.read()))
Expand Down