Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions interactive_process/interactive_process.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
import secrets
import shlex
import string
import time

from ptyprocess import PtyProcessUnicode
import platform
Expand All @@ -12,19 +16,69 @@ class ReadWriteError(Exception):


class InteractiveProcess:
def __init__(self, env={"PS1": "", "TERM": "dumb"}, echo=False):
def __init__(self, env={"PS1": "", "TERM": "dumb"}, shell_prompt = "", echo=False):
if platform.system() == 'Windows':
shell = 'cmd.exe'
else:
shell = '/bin/bash'
self.shell_prompt = shell_prompt
self.buffer = ""
self.process = PtyProcessUnicode.spawn([shell, '--noprofile', '--norc'], env=env, echo=echo)

def send_command(self, command):
@classmethod
def with_random_prompt(cls) -> "InteractiveProcess":
alphabet = string.ascii_letters + string.digits
random_string = ''.join(secrets.choice(alphabet) for i in range(8))
prompt = f"user-{random_string}$"

return cls(shell_prompt=prompt)

def flush_output(self):
self.process.write("echo flushed" + os.linesep)
return self.read_to_text("flushed" + os.linesep)

def send_command(self, command, end_marker=None):
try:
self.process.write(f"{command}" + os.linesep)
escaped_command = shlex.quote(command)
echo_text = f"echo {self.shell_prompt} {escaped_command}"
self.process.write(echo_text + os.linesep)

if end_marker:
shell_command = f"{command} && echo {end_marker} || echo {end_marker}"
self.process.write(f"{shell_command}" + os.linesep)
else:
self.process.write(f"{command}" + os.linesep)

except OSError as e:
raise ReadWriteError(f"Failed to write to stdin due to OSError") from e

def send_input(self, input_text: str):
try:
input_text = f"{input_text}" + os.linesep
self.buffer += input_text # keep input in the buffer for the next read to pick it up
self.process.write(f"{input_text}" + os.linesep)
except OSError as e:
raise ReadWriteError(f"Failed to write to stdin due to OSError") from e

# TODO: need more tests for this
def read_to_text(self, text: str, inclusive = True, timeout=0.5):
start_time = time.monotonic()
output = ""
while True:
try:
output += self.read_nonblocking(0.01)
index = output.find(text)
if index != -1:
if inclusive:
index = index + len(text)
self.buffer = output[index:]
return output[:index]
except TimeoutError as e:
if time.monotonic() - start_time > timeout:
self.buffer = output # Just save the buffer, so that you can get it by calling read_nonblocking
raise e
continue

def read_nonblocking(self, timeout=0.1):
"""
Reads from stdout and std_err. Timeout is used to wait for data. But as soon as data is read,
Expand All @@ -36,16 +90,25 @@ def read_nonblocking(self, timeout=0.1):
"""
if not self.process.isalive():
raise TerminatedProcessError(f"Process is terminated with return code {self.process.status}")
readables, _, _ = select([self.process.fd], [], [], timeout)

output = ""
if self.buffer:
output = self.buffer
self.buffer = ""
timeout = 0 # since we already have some output, just collect whatever else is already waiting

readables, _, _ = select([self.process.fd], [], [], timeout)
if readables:
try:
return self.process.read().replace("\r\n", "\n")
output += self.process.read().replace("\r\n", "\n")
except EOFError as e:
return ""
except OSError as e:
raise ReadWriteError(f"Failed to read due to OSError") from e

if output:
return output

raise TimeoutError(f"No data read before reaching timout of {timeout}s")

def close(self):
Expand Down
45 changes: 24 additions & 21 deletions tests/test_interactive_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,32 @@ def error_commands(request):
class TestInteractiveProcess:
@pytest.fixture(autouse=True)
def process(self):
self.process = InteractiveProcess()
self.process.send_command("echo flush\n")
while True:
try:
flushed = self.process.read_nonblocking(0.001) # clear buffer
except TimeoutError:
continue
else:
if "flush" in flushed:
break
self.process = InteractiveProcess(shell_prompt="testing$")
flushed = self.process.flush_output()
print(f"\nFlushed before reading input:\n{flushed}")
return self.process

def test_stream_nonblocking(self):
def test_read_nonblocking(self):
self.process.send_command("echo Hello")

time.sleep(0.2) # wait for output to show up in the terminal
output = self.process.read_nonblocking(2)

assert output.strip() == "Hello" # newline is part of echo command
assert output.strip() == "testing$ echo Hello\nHello" # newline is part of echo command

def test_stream_nonblocking_sleeping_command(self):
def test_read_nonblocking_sleeping_command(self):
self.process.send_command("sleep 0.2 && echo Hello")
time.sleep(0.3)

output = self.process.read_nonblocking(2)

assert output.strip() == "Hello"
assert output.strip() == "testing$ sleep 0.2 && echo Hello\nHello"

def test_stream_nonblocking_sleeping_command_timeout(self):
def test_read_nonblocking_sleeping_command_timeout(self):
self.process.send_command("sleep 1 && echo Hello")
time.sleep(0.2)
echo = self.process.read_nonblocking(0.1)
assert echo.strip() == "testing$ sleep 1 && echo Hello"
time.sleep(0.3)

with pytest.raises(TimeoutError):
self.process.read_nonblocking(0.1)
Expand All @@ -57,21 +55,24 @@ def test_read_with_process_closed(self):

def test_read_with_intput_response(self):
self.process.send_command('read -p "Please enter your name: " user_name')
time.sleep(0.1)
prompt = self.process.read_nonblocking(0.1)
assert prompt == "Please enter your name: "
assert prompt.strip() == """testing$ read -p "Please enter your name: " user_name\nPlease enter your name:"""
# Check for timeout after we read the prompt, maybe should be own test
with pytest.raises(TimeoutError):
self.process.read_nonblocking(0.01)
self.process.send_command('dog')
self.process.send_input('dog')

self.process.send_command('echo $user_name')
time.sleep(0.1)
output_result = self.process.read_nonblocking(0.1)

assert output_result.strip() == 'dog'
assert output_result.strip() == 'dog\ntesting$ echo $user_name\ndog'

def test_read_std_err(self, error_commands):
command, expect_output =error_commands
self.process.send_command(command)
time.sleep(0.1)

output = self.process.read_nonblocking(0.2)

Expand All @@ -96,6 +97,8 @@ def test_read_nonblocking_read_error(self):
def test_read_nonblocking_clear_command(self):
self.process.send_command('clear') # with "dumb" terminal clear command FAIL silently
self.process.send_command('echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7')
time.sleep(1)
time.sleep(0.1)
value = self.process.read_nonblocking(1)
assert value.strip() == "Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7"
assert value.strip() == ('testing$ clear\n'
'testing$ echo Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7\n'
'Completed 4e556f02-38a1-4eec-8e0c-2d8afcd37ae7')