From 2c7b0bc6dc79304571a93977e806125959d1c865 Mon Sep 17 00:00:00 2001 From: Yason Khaburzaniya Date: Wed, 12 Feb 2025 22:22:12 -0800 Subject: [PATCH 1/5] simulate echo wip --- interactive_process/interactive_process.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/interactive_process/interactive_process.py b/interactive_process/interactive_process.py index dd1206b..a5489e1 100644 --- a/interactive_process/interactive_process.py +++ b/interactive_process/interactive_process.py @@ -1,4 +1,5 @@ import os +import shlex from ptyprocess import PtyProcessUnicode import platform @@ -19,12 +20,24 @@ def __init__(self, env={"PS1": "", "TERM": "dumb"}, echo=False): shell = '/bin/bash' self.process = PtyProcessUnicode.spawn([shell, '--noprofile', '--norc'], env=env, echo=echo) - def send_command(self, command): + def send_command(self, command, end_marker=None): try: - self.process.write(f"{command}" + os.linesep) + escaped_command = shlex.quote(command) + echo_text = f"echo $ {escaped_command}" + self.process.write(echo_text + os.linesep) + + if end_marker: + shell_command = f"{command} && echo {end_marker} || echo {end_marker}" + self.process.write(f"{shell_command}" + os.linesep) + else: + self.process.write(f"{command}" + os.linesep) + except OSError as e: raise ReadWriteError(f"Failed to write to stdin due to OSError") from e + def send_input(self, input_text: str): + self.send_command(input_text, None) + def read_nonblocking(self, timeout=0.1): """ Reads from stdout and std_err. Timeout is used to wait for data. But as soon as data is read, From 009094cd3b00321c59f3a9ea25a6560fe7e833a4 Mon Sep 17 00:00:00 2001 From: Yason Khaburzaniya Date: Sat, 15 Feb 2025 15:36:52 -0800 Subject: [PATCH 2/5] fixing up the tests --- tests/test_interactive_process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_interactive_process.py b/tests/test_interactive_process.py index 3cbfd42..1297727 100644 --- a/tests/test_interactive_process.py +++ b/tests/test_interactive_process.py @@ -17,14 +17,14 @@ class TestInteractiveProcess: @pytest.fixture(autouse=True) def process(self): self.process = InteractiveProcess() - self.process.send_command("echo flush\n") + self.process.send_command("echo flush", "Completed flush") while True: try: flushed = self.process.read_nonblocking(0.001) # clear buffer except TimeoutError: continue else: - if "flush" in flushed: + if "Completed flush" in flushed: break return self.process From d6150e9f300fbfdec36784d5329398ff5a80ea9d Mon Sep 17 00:00:00 2001 From: Yason Khaburzaniya Date: Sun, 16 Feb 2025 13:31:25 -0800 Subject: [PATCH 3/5] fix tests, send_input, and read_to_text --- interactive_process/interactive_process.py | 55 ++++++++++++++++++++-- tests/test_interactive_process.py | 42 +++++++++-------- 2 files changed, 73 insertions(+), 24 deletions(-) diff --git a/interactive_process/interactive_process.py b/interactive_process/interactive_process.py index a5489e1..ca234c6 100644 --- a/interactive_process/interactive_process.py +++ b/interactive_process/interactive_process.py @@ -1,5 +1,8 @@ import os +import secrets import shlex +import string +import time from ptyprocess import PtyProcessUnicode import platform @@ -13,17 +16,27 @@ class ReadWriteError(Exception): class InteractiveProcess: - def __init__(self, env={"PS1": "", "TERM": "dumb"}, echo=False): + def __init__(self, env={"PS1": "", "TERM": "dumb"}, shell_prompt = "", echo=False): if platform.system() == 'Windows': shell = 'cmd.exe' else: shell = '/bin/bash' + self.shell_prompt = shell_prompt + self.buffer = "" self.process = PtyProcessUnicode.spawn([shell, '--noprofile', '--norc'], env=env, echo=echo) + @classmethod + def with_random_prompt(cls) -> "InteractiveProcess": + alphabet = string.ascii_letters + string.digits + random_string = ''.join(secrets.choice(alphabet) for i in range(8)) + prompt = f"user-{random_string}$" + + return cls(shell_prompt=prompt) + def send_command(self, command, end_marker=None): try: escaped_command = shlex.quote(command) - echo_text = f"echo $ {escaped_command}" + echo_text = f"echo {self.shell_prompt} {escaped_command}" self.process.write(echo_text + os.linesep) if end_marker: @@ -36,7 +49,30 @@ def send_command(self, command, end_marker=None): raise ReadWriteError(f"Failed to write to stdin due to OSError") from e def send_input(self, input_text: str): - self.send_command(input_text, None) + try: + input_text = f"{input_text}" + os.linesep + self.buffer += input_text + self.process.write(f"{input_text}" + os.linesep) + except OSError as e: + raise ReadWriteError(f"Failed to write to stdin due to OSError") from e + + def read_to_text(self, text: str, inclusive = True, timeout=0.5): + start_time = time.monotonic() + output = "" + while True: + try: + output += self.read_nonblocking(0.01) + index = output.find(text) + if index != -1: + if inclusive: + index = index + len(text) + self.buffer = output[index:] + return output[:index] + except TimeoutError as e: + if time.monotonic() - start_time > timeout: + self.buffer = output # Just save the buffer, so that you can get it by calling read_nonblocking + raise e + continue def read_nonblocking(self, timeout=0.1): """ @@ -49,16 +85,25 @@ def read_nonblocking(self, timeout=0.1): """ if not self.process.isalive(): raise TerminatedProcessError(f"Process is terminated with return code {self.process.status}") - readables, _, _ = select([self.process.fd], [], [], timeout) + output = "" + if self.buffer: + output = self.buffer + self.buffer = "" + timeout = 0 # since we already have some output, just collect whatever else is already waiting + + readables, _, _ = select([self.process.fd], [], [], timeout) if readables: try: - return self.process.read().replace("\r\n", "\n") + output += self.process.read().replace("\r\n", "\n") except EOFError as e: return "" except OSError as e: raise ReadWriteError(f"Failed to read due to OSError") from e + if output: + return output + raise TimeoutError(f"No data read before reaching timout of {timeout}s") def close(self): diff --git a/tests/test_interactive_process.py b/tests/test_interactive_process.py index 1297727..5a3ef8f 100644 --- a/tests/test_interactive_process.py +++ b/tests/test_interactive_process.py @@ -16,34 +16,33 @@ def error_commands(request): class TestInteractiveProcess: @pytest.fixture(autouse=True) def process(self): - self.process = InteractiveProcess() + self.process = InteractiveProcess(shell_prompt="testing$") self.process.send_command("echo flush", "Completed flush") - while True: - try: - flushed = self.process.read_nonblocking(0.001) # clear buffer - except TimeoutError: - continue - else: - if "Completed flush" in flushed: - break + flushed = self.process.read_to_text("Completed flush") + print(f"\nFlushed before reading input:\n{flushed}") return self.process - def test_stream_nonblocking(self): + def test_read_nonblocking(self): self.process.send_command("echo Hello") - + time.sleep(0.2) # wait for output to show up in the terminal output = self.process.read_nonblocking(2) - assert output.strip() == "Hello" # newline is part of echo command + assert output.strip() == "testing$ echo Hello\nHello" # newline is part of echo command - def test_stream_nonblocking_sleeping_command(self): + def test_read_nonblocking_sleeping_command(self): self.process.send_command("sleep 0.2 && echo Hello") + time.sleep(0.3) output = self.process.read_nonblocking(2) - assert output.strip() == "Hello" + assert output.strip() == "testing$ sleep 0.2 && echo Hello\nHello" def test_stream_nonblocking_sleeping_command_timeout(self): self.process.send_command("sleep 1 && echo Hello") + time.sleep(0.2) + echo = self.process.read_nonblocking(0.1) + assert echo.strip() == "testing$ sleep 1 && echo Hello" + time.sleep(0.3) with pytest.raises(TimeoutError): self.process.read_nonblocking(0.1) @@ -57,21 +56,24 @@ def test_read_with_process_closed(self): def test_read_with_intput_response(self): self.process.send_command('read -p "Please enter your name: " user_name') + time.sleep(0.1) prompt = self.process.read_nonblocking(0.1) - assert prompt == "Please enter your name: " + assert prompt.strip() == """testing$ read -p "Please enter your name: " user_name\nPlease enter your name:""" # Check for timeout after we read the prompt, maybe should be own test with pytest.raises(TimeoutError): self.process.read_nonblocking(0.01) - self.process.send_command('dog') + self.process.send_input('dog') self.process.send_command('echo $user_name') + time.sleep(0.1) output_result = self.process.read_nonblocking(0.1) - assert output_result.strip() == 'dog' + assert output_result.strip() == 'dog\ntesting$ echo $user_name\ndog' def test_read_std_err(self, error_commands): command, expect_output =error_commands self.process.send_command(command) + time.sleep(0.1) output = self.process.read_nonblocking(0.2) @@ -96,6 +98,8 @@ def test_read_nonblocking_read_error(self): def test_read_nonblocking_clear_command(self): self.process.send_command('clear') # with "dumb" terminal clear command FAIL silently self.process.send_command('echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7') - time.sleep(1) + time.sleep(0.1) value = self.process.read_nonblocking(1) - assert value.strip() == "Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7" \ No newline at end of file + assert value.strip() == ('testing$ clear\n' + 'testing$ echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7\n' + 'Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7') \ No newline at end of file From 9b312de74649aaf0a8e547a70e6a3f201b95ad6b Mon Sep 17 00:00:00 2001 From: Yason Khaburzaniya Date: Sun, 16 Feb 2025 16:41:45 -0800 Subject: [PATCH 4/5] built in flushing of output --- interactive_process/interactive_process.py | 5 +++++ tests/test_interactive_process.py | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/interactive_process/interactive_process.py b/interactive_process/interactive_process.py index ca234c6..9271793 100644 --- a/interactive_process/interactive_process.py +++ b/interactive_process/interactive_process.py @@ -33,6 +33,10 @@ def with_random_prompt(cls) -> "InteractiveProcess": return cls(shell_prompt=prompt) + def flush_output(self): + self.process.write("echo flushed" + os.linesep) + return self.read_to_text("flushed" + os.linesep) + def send_command(self, command, end_marker=None): try: escaped_command = shlex.quote(command) @@ -56,6 +60,7 @@ def send_input(self, input_text: str): except OSError as e: raise ReadWriteError(f"Failed to write to stdin due to OSError") from e + # TODO: need more tests for this def read_to_text(self, text: str, inclusive = True, timeout=0.5): start_time = time.monotonic() output = "" diff --git a/tests/test_interactive_process.py b/tests/test_interactive_process.py index 5a3ef8f..de1651b 100644 --- a/tests/test_interactive_process.py +++ b/tests/test_interactive_process.py @@ -17,8 +17,7 @@ class TestInteractiveProcess: @pytest.fixture(autouse=True) def process(self): self.process = InteractiveProcess(shell_prompt="testing$") - self.process.send_command("echo flush", "Completed flush") - flushed = self.process.read_to_text("Completed flush") + flushed = self.process.flush_output() print(f"\nFlushed before reading input:\n{flushed}") return self.process From 8647dd9d34b6f5af52ddb2250692e2538a8deffb Mon Sep 17 00:00:00 2001 From: Yason Khaburzaniya Date: Sun, 16 Feb 2025 21:20:36 -0800 Subject: [PATCH 5/5] minor cleanup --- interactive_process/interactive_process.py | 2 +- tests/test_interactive_process.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/interactive_process/interactive_process.py b/interactive_process/interactive_process.py index 9271793..4b5e05e 100644 --- a/interactive_process/interactive_process.py +++ b/interactive_process/interactive_process.py @@ -55,7 +55,7 @@ def send_command(self, command, end_marker=None): def send_input(self, input_text: str): try: input_text = f"{input_text}" + os.linesep - self.buffer += input_text + self.buffer += input_text # keep input in the buffer for the next read to pick it up self.process.write(f"{input_text}" + os.linesep) except OSError as e: raise ReadWriteError(f"Failed to write to stdin due to OSError") from e diff --git a/tests/test_interactive_process.py b/tests/test_interactive_process.py index de1651b..1d409ba 100644 --- a/tests/test_interactive_process.py +++ b/tests/test_interactive_process.py @@ -36,7 +36,7 @@ def test_read_nonblocking_sleeping_command(self): assert output.strip() == "testing$ sleep 0.2 && echo Hello\nHello" - def test_stream_nonblocking_sleeping_command_timeout(self): + def test_read_nonblocking_sleeping_command_timeout(self): self.process.send_command("sleep 1 && echo Hello") time.sleep(0.2) echo = self.process.read_nonblocking(0.1)