From 1a08293d45e8b9e6225e9f72bf38575b49c5fa7d Mon Sep 17 00:00:00 2001 From: imcdo Date: Mon, 21 Nov 2022 14:19:45 -0800 Subject: [PATCH 1/3] Show stdout and stderr with failure in remote account --- ducktape/cluster/remoteaccount.py | 50 ++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/ducktape/cluster/remoteaccount.py b/ducktape/cluster/remoteaccount.py index fa2399e7e..e57330e3d 100644 --- a/ducktape/cluster/remoteaccount.py +++ b/ducktape/cluster/remoteaccount.py @@ -17,6 +17,9 @@ import os from paramiko import SSHClient, SSHConfig, MissingHostKeyPolicy from paramiko.ssh_exception import SSHException, NoValidConnectionsError +from dataclasses import dataclass +from collections import deque +from typing import Iterable, Iterator import shutil import signal import socket @@ -29,6 +32,9 @@ from ducktape.errors import DucktapeError +NUM_RING_LINES = 33 + + def check_ssh(method): def wrapper(self, *args, **kwargs): try: @@ -113,21 +119,36 @@ def __str__(self): return "%s: %s" % (self.account_str, self.msg) +@dataclass class RemoteCommandError(RemoteAccountError): """This exception is raised when a process run by ssh*() returns a non-zero exit status. """ - - def __init__(self, account, cmd, exit_status, msg): - self.account_str = str(account) - self.exit_status = exit_status - self.cmd = cmd - self.msg = msg + account: "RemoteAccount" + exit_status: int + cmd: str + msg: str = "" + stdout: Iterable[str] = [] + stderr: Iterable[str] = [] def __str__(self): - msg = "%s: Command '%s' returned non-zero exit status %d." % (self.account_str, self.cmd, self.exit_status) + lines = [f"{self.account}: Command {self.cmd} returned non-zero exit status {self.exit_status}.{self.msg}"] if self.msg: - msg += " Remote error message: %s" % self.msg - return msg + lines.append(f"\tMessage: {self.msg}") + if self.stdout: + lines.append(f"\tStdout:") + lines.extend(f"\t\t{line}" for line in self.stdout) + if self.stderr: + lines.append(f"\tStderr:") + lines.extend(f"\t\t{line}" for line in self.stderr) + return "\n".join(lines) + + +def ring_process(stream: Iterator, buffersize=NUM_RING_LINES): + dq = deque(maxlen=buffersize) + dq.extend(stream) + if len(dq) == buffersize: + dq.appendleft(f"Showing the last {buffersize-1} lines...") + return dq class RemoteAccount(HttpMixin): @@ -302,12 +323,12 @@ def ssh(self, cmd, allow_fail=False): # Unfortunately we need to read over the channel to ensure that recv_exit_status won't hang. See: # http://docs.paramiko.org/en/2.0/api/channel.html#paramiko.channel.Channel.recv_exit_status - stdout.read() + ring_buff_stdout = ring_process(stdout) exit_status = stdout.channel.recv_exit_status() try: if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stderr.read()) + raise RemoteCommandError(self, cmd, exit_status, stdout=ring_buff_stdout, stderr=ring_process(stderr)) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read())) @@ -349,10 +370,11 @@ def ssh_capture(self, cmd, allow_fail=False, callback=None, combine_stderr=True, stdout = chan.makefile('r', -1) stderr = chan.makefile_stderr('r', -1) + stdout_buff = deque(maxlen=NUM_RING_LINES) def output_generator(): for line in iter(stdout.readline, ''): - + stdout_buff.append(line) if callback is None: yield line else: @@ -361,7 +383,7 @@ def output_generator(): exit_status = stdout.channel.recv_exit_status() if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stderr.read()) + raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_buff, stderr=ring_process(stderr)) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read())) @@ -404,7 +426,7 @@ def ssh_output(self, cmd, allow_fail=False, combine_stderr=True, timeout_sec=Non exit_status = stdin.channel.recv_exit_status() if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stderr.read()) + raise RemoteCommandError(self, cmd, exit_status, stdout=stdoutdata.split("\n"), stderr=ring_process(stderr)) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read())) From ef25c202355c763d0aa034585336326ee1c93cfb Mon Sep 17 00:00:00 2001 From: imcdo Date: Mon, 21 Nov 2022 14:30:14 -0800 Subject: [PATCH 2/3] fix override --- ducktape/cluster/remoteaccount.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ducktape/cluster/remoteaccount.py b/ducktape/cluster/remoteaccount.py index e57330e3d..70b09fd1b 100644 --- a/ducktape/cluster/remoteaccount.py +++ b/ducktape/cluster/remoteaccount.py @@ -147,7 +147,7 @@ def ring_process(stream: Iterator, buffersize=NUM_RING_LINES): dq = deque(maxlen=buffersize) dq.extend(stream) if len(dq) == buffersize: - dq.appendleft(f"Showing the last {buffersize-1} lines...") + dq[0] = (f"Showing the last {buffersize-1} lines...") return dq From 0526604ec6540a57ec87a0453ad4d3f3554d26cf Mon Sep 17 00:00:00 2001 From: imcdo Date: Mon, 28 Nov 2022 12:20:51 -0800 Subject: [PATCH 3/3] only ring buffer on ssh capture --- ducktape/cluster/remoteaccount.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/ducktape/cluster/remoteaccount.py b/ducktape/cluster/remoteaccount.py index 70b09fd1b..6c4b32e47 100644 --- a/ducktape/cluster/remoteaccount.py +++ b/ducktape/cluster/remoteaccount.py @@ -143,14 +143,6 @@ def __str__(self): return "\n".join(lines) -def ring_process(stream: Iterator, buffersize=NUM_RING_LINES): - dq = deque(maxlen=buffersize) - dq.extend(stream) - if len(dq) == buffersize: - dq[0] = (f"Showing the last {buffersize-1} lines...") - return dq - - class RemoteAccount(HttpMixin): """RemoteAccount is the heart of interaction with cluster nodes, and every allocated cluster node has a reference to an instance of RemoteAccount. @@ -323,12 +315,12 @@ def ssh(self, cmd, allow_fail=False): # Unfortunately we need to read over the channel to ensure that recv_exit_status won't hang. See: # http://docs.paramiko.org/en/2.0/api/channel.html#paramiko.channel.Channel.recv_exit_status - ring_buff_stdout = ring_process(stdout) + stdout_lines = stdout.readlines() exit_status = stdout.channel.recv_exit_status() try: if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stdout=ring_buff_stdout, stderr=ring_process(stderr)) + raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_lines, stderr=stderr.readlines()) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read())) @@ -383,7 +375,7 @@ def output_generator(): exit_status = stdout.channel.recv_exit_status() if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_buff, stderr=ring_process(stderr)) + raise RemoteCommandError(self, cmd, exit_status, stdout=stdout_buff, stderr=stderr.readlines()) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read())) @@ -426,7 +418,7 @@ def ssh_output(self, cmd, allow_fail=False, combine_stderr=True, timeout_sec=Non exit_status = stdin.channel.recv_exit_status() if exit_status != 0: if not allow_fail: - raise RemoteCommandError(self, cmd, exit_status, stdout=stdoutdata.split("\n"), stderr=ring_process(stderr)) + raise RemoteCommandError(self, cmd, exit_status, stdout=stdoutdata.split("\n"), stderr=stderr.readlines()) else: self._log(logging.DEBUG, "Running ssh command '%s' exited with status %d and message: %s" % (cmd, exit_status, stderr.read()))