diff --git a/interactive_process/interactive_process.py b/interactive_process/interactive_process.py index dd1206b..4b5e05e 100644 --- a/interactive_process/interactive_process.py +++ b/interactive_process/interactive_process.py @@ -1,4 +1,8 @@ import os +import secrets +import shlex +import string +import time from ptyprocess import PtyProcessUnicode import platform @@ -12,19 +16,69 @@ class ReadWriteError(Exception): class InteractiveProcess: - def __init__(self, env={"PS1": "", "TERM": "dumb"}, echo=False): + def __init__(self, env={"PS1": "", "TERM": "dumb"}, shell_prompt = "", echo=False): if platform.system() == 'Windows': shell = 'cmd.exe' else: shell = '/bin/bash' + self.shell_prompt = shell_prompt + self.buffer = "" self.process = PtyProcessUnicode.spawn([shell, '--noprofile', '--norc'], env=env, echo=echo) - def send_command(self, command): + @classmethod + def with_random_prompt(cls) -> "InteractiveProcess": + alphabet = string.ascii_letters + string.digits + random_string = ''.join(secrets.choice(alphabet) for i in range(8)) + prompt = f"user-{random_string}$" + + return cls(shell_prompt=prompt) + + def flush_output(self): + self.process.write("echo flushed" + os.linesep) + return self.read_to_text("flushed" + os.linesep) + + def send_command(self, command, end_marker=None): try: - self.process.write(f"{command}" + os.linesep) + escaped_command = shlex.quote(command) + echo_text = f"echo {self.shell_prompt} {escaped_command}" + self.process.write(echo_text + os.linesep) + + if end_marker: + shell_command = f"{command} && echo {end_marker} || echo {end_marker}" + self.process.write(f"{shell_command}" + os.linesep) + else: + self.process.write(f"{command}" + os.linesep) + except OSError as e: raise ReadWriteError(f"Failed to write to stdin due to OSError") from e + def send_input(self, input_text: str): + try: + input_text = f"{input_text}" + os.linesep + self.buffer += input_text # keep input in the buffer for the next read to pick it up + self.process.write(f"{input_text}" + os.linesep) + except OSError as e: + raise ReadWriteError(f"Failed to write to stdin due to OSError") from e + + # TODO: need more tests for this + def read_to_text(self, text: str, inclusive = True, timeout=0.5): + start_time = time.monotonic() + output = "" + while True: + try: + output += self.read_nonblocking(0.01) + index = output.find(text) + if index != -1: + if inclusive: + index = index + len(text) + self.buffer = output[index:] + return output[:index] + except TimeoutError as e: + if time.monotonic() - start_time > timeout: + self.buffer = output # Just save the buffer, so that you can get it by calling read_nonblocking + raise e + continue + def read_nonblocking(self, timeout=0.1): """ Reads from stdout and std_err. Timeout is used to wait for data. But as soon as data is read, @@ -36,16 +90,25 @@ def read_nonblocking(self, timeout=0.1): """ if not self.process.isalive(): raise TerminatedProcessError(f"Process is terminated with return code {self.process.status}") - readables, _, _ = select([self.process.fd], [], [], timeout) + output = "" + if self.buffer: + output = self.buffer + self.buffer = "" + timeout = 0 # since we already have some output, just collect whatever else is already waiting + + readables, _, _ = select([self.process.fd], [], [], timeout) if readables: try: - return self.process.read().replace("\r\n", "\n") + output += self.process.read().replace("\r\n", "\n") except EOFError as e: return "" except OSError as e: raise ReadWriteError(f"Failed to read due to OSError") from e + if output: + return output + raise TimeoutError(f"No data read before reaching timout of {timeout}s") def close(self): diff --git a/tests/test_interactive_process.py b/tests/test_interactive_process.py index 3cbfd42..1d409ba 100644 --- a/tests/test_interactive_process.py +++ b/tests/test_interactive_process.py @@ -16,34 +16,32 @@ def error_commands(request): class TestInteractiveProcess: @pytest.fixture(autouse=True) def process(self): - self.process = InteractiveProcess() - self.process.send_command("echo flush\n") - while True: - try: - flushed = self.process.read_nonblocking(0.001) # clear buffer - except TimeoutError: - continue - else: - if "flush" in flushed: - break + self.process = InteractiveProcess(shell_prompt="testing$") + flushed = self.process.flush_output() + print(f"\nFlushed before reading input:\n{flushed}") return self.process - def test_stream_nonblocking(self): + def test_read_nonblocking(self): self.process.send_command("echo Hello") - + time.sleep(0.2) # wait for output to show up in the terminal output = self.process.read_nonblocking(2) - assert output.strip() == "Hello" # newline is part of echo command + assert output.strip() == "testing$ echo Hello\nHello" # newline is part of echo command - def test_stream_nonblocking_sleeping_command(self): + def test_read_nonblocking_sleeping_command(self): self.process.send_command("sleep 0.2 && echo Hello") + time.sleep(0.3) output = self.process.read_nonblocking(2) - assert output.strip() == "Hello" + assert output.strip() == "testing$ sleep 0.2 && echo Hello\nHello" - def test_stream_nonblocking_sleeping_command_timeout(self): + def test_read_nonblocking_sleeping_command_timeout(self): self.process.send_command("sleep 1 && echo Hello") + time.sleep(0.2) + echo = self.process.read_nonblocking(0.1) + assert echo.strip() == "testing$ sleep 1 && echo Hello" + time.sleep(0.3) with pytest.raises(TimeoutError): self.process.read_nonblocking(0.1) @@ -57,21 +55,24 @@ def test_read_with_process_closed(self): def test_read_with_intput_response(self): self.process.send_command('read -p "Please enter your name: " user_name') + time.sleep(0.1) prompt = self.process.read_nonblocking(0.1) - assert prompt == "Please enter your name: " + assert prompt.strip() == """testing$ read -p "Please enter your name: " user_name\nPlease enter your name:""" # Check for timeout after we read the prompt, maybe should be own test with pytest.raises(TimeoutError): self.process.read_nonblocking(0.01) - self.process.send_command('dog') + self.process.send_input('dog') self.process.send_command('echo $user_name') + time.sleep(0.1) output_result = self.process.read_nonblocking(0.1) - assert output_result.strip() == 'dog' + assert output_result.strip() == 'dog\ntesting$ echo $user_name\ndog' def test_read_std_err(self, error_commands): command, expect_output =error_commands self.process.send_command(command) + time.sleep(0.1) output = self.process.read_nonblocking(0.2) @@ -96,6 +97,8 @@ def test_read_nonblocking_read_error(self): def test_read_nonblocking_clear_command(self): self.process.send_command('clear') # with "dumb" terminal clear command FAIL silently self.process.send_command('echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7') - time.sleep(1) + time.sleep(0.1) value = self.process.read_nonblocking(1) - assert value.strip() == "Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7" \ No newline at end of file + assert value.strip() == ('testing$ clear\n' + 'testing$ echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7\n' + 'Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7') \ No newline at end of file