Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _install_WfCommons_on_container(container):
exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages",
workdir="/tmp/WfCommons", stdout=True, stderr=True)
if exit_code != 0:
raise RuntimeError("Failed to install WfCommons on the container");
raise RuntimeError("Failed to install WfCommons on the container")

def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None):
if command is None:
Expand All @@ -62,13 +62,13 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=

try:
image = client.images.get(image_name)
sys.stderr.write(f"Image '{image_name}' is available locally\n")
sys.stderr.write(f"[{backend}] Image '{image_name}' is available locally\n")
except ImageNotFound:
sys.stderr.write(f"Pulling image '{image_name}'...\n")
sys.stderr.write(f"[{backend}] Pulling image '{image_name}'...\n")
client.images.pull(image_name)

# Launch the docker container to actually run the translated workflow
sys.stderr.write("Starting Docker container...\n")
sys.stderr.write(f"[{backend}] Starting Docker container...\n")
container = client.containers.run(
image=image_name,
command=command,
Expand All @@ -83,7 +83,7 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=

# Copy over the wfbench and cpu-benchmark executables to where they should go on the container
if bin_dir:
sys.stderr.write("Copying wfbench and cpu-benchmark...\n")
sys.stderr.write(f"[{backend}] Copying wfbench and cpu-benchmark...\n")
exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir],
stdout=True, stderr=True)
if exit_code != 0:
Expand All @@ -93,13 +93,27 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=
if exit_code != 0:
raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory")
else:
sys.stderr.write("Not Copying wfbench and cpu-benchmark...\n")
sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n")

container.backend = backend
return container

def _shutdown_docker_container(container):
container.stop()
container.remove()
def _shutdown_docker_container_and_remove_image(container):
image = container.image
sys.stderr.write(f"[{container.backend}] Terminating container if need be...\n")
try:
container.stop()
container.remove()
except Exception as e:
pass

# Remove the images as we go, if running on GitHub
if os.getenv('CI') or os.getenv('GITHUB_ACTIONS'):
sys.stderr.write(f"[{container.backend}] Removing Docker image...\n")
try:
image.remove(force=True)
except Exception as e:
sys.stderr.write(f"[{container.backend}] Warning: Error while removing image: {e}\n")

def _get_total_size_of_directory(directory_path: str):
total_size = 0
Expand Down
38 changes: 9 additions & 29 deletions tests/translators_loggers/test_translators_loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tests.test_helpers import _create_fresh_local_dir
from tests.test_helpers import _remove_local_dir_if_it_exists
from tests.test_helpers import _start_docker_container
from tests.test_helpers import _shutdown_docker_container
from tests.test_helpers import _shutdown_docker_container_and_remove_image
from tests.test_helpers import _compare_workflows

from wfcommons import BlastRecipe
Expand Down Expand Up @@ -116,8 +116,6 @@ def _additional_setup_swiftt(container):

def run_workflow_dask(container, num_tasks, str_dirpath):
exit_code, output = container.exec_run("python ./dask_workflow.py", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert (exit_code == 0)
assert (output.decode().count("completed!") == num_tasks)
Expand All @@ -126,8 +124,6 @@ def run_workflow_dask(container, num_tasks, str_dirpath):
def run_workflow_parsl(container, num_tasks, str_dirpath):
exit_code, output = container.exec_run("python ./parsl_workflow.py", stdout=True, stderr=True)
ignored, output = container.exec_run(f"cat {str_dirpath}/runinfo/000/parsl.log", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert (exit_code == 0)
assert ("completed" in output.decode())
Expand All @@ -137,8 +133,6 @@ def run_workflow_nextflow(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(f"nextflow run ./workflow.nf --pwd .", stdout=True, stderr=True)
ignored, task_exit_codes = container.exec_run("find . -name .exitcode -exec cat {} \;", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert (exit_code == 0)
assert (task_exit_codes.decode() == num_tasks * "0")
Expand All @@ -149,37 +143,28 @@ def run_workflow_airflow(container, num_tasks, str_dirpath):
exit_code, output = container.exec_run(cmd=["sh", "-c", "cd /home/wfcommons/ && sudo /bin/bash /run_a_workflow.sh Blast-Benchmark"],
stdout=True,
stderr=True)
# Kill the container
container.remove(force=True)

# Check sanity
assert (exit_code == 0)
assert (output.decode().count("completed") == num_tasks * 2)

def run_workflow_bash(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert (exit_code == 0)
assert (output.decode().count("completed") == num_tasks)

def run_workflow_taskvine(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && python3 ./taskvine_workflow.py"], stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# # Check sanity
# Check sanity
assert (exit_code == 0)
assert (output.decode().count("completed") == num_tasks)

def run_workflow_cwl(container, num_tasks, str_dirpath):
# Run the workflow!
# Note that the input file is hardcoded and Blast-specific
exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert (exit_code == 0)
# this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files",
Expand All @@ -189,17 +174,13 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
def run_workflow_pegasus(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# Check sanity
assert(exit_code == 0)
assert("success" in output.decode())

def run_workflow_swiftt(container, num_tasks, str_dirpath):
# Run the workflow!
exit_code, output = container.exec_run(cmd="swift-t workflow.swift", stdout=True, stderr=True)
# Kill the container
container.remove(force=True)
# sys.stderr.write(output.decode())
# Check sanity
assert(exit_code == 0)
Expand Down Expand Up @@ -236,7 +217,7 @@ class TestTranslators:
@pytest.mark.parametrize(
"backend",
[
# "swiftt",
"swiftt",
"dask",
"parsl",
"nextflow",
Expand All @@ -259,22 +240,21 @@ def test_translator(self, backend) -> None:
_remove_local_dir_if_it_exists(str_dirpath)

# Perform the translation
sys.stderr.write("\nTranslating workflow...\n")
sys.stderr.write(f"\n[{backend}] Translating workflow...\n")
translator = translator_classes[backend](benchmark.workflow)
translator.translate(output_folder=dirpath)

# Start the Docker container
sys.stderr.write("Starting Docker container...\n")
container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/")

# Do whatever necessary setup
additional_setup_methods[backend](container)

# Run the workflow
sys.stderr.write("Running workflow...\n")
sys.stderr.write(f"[{backend}] Running workflow...\n")
start_time = time.time()
run_workflow_methods[backend](container, num_tasks, str_dirpath)
sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time))
sys.stderr.write(f"[{backend}] Workflow ran in %.2f seconds\n" % (time.time() - start_time))

# Run the log parser if any
if backend == "pegasus":
Expand All @@ -285,14 +265,14 @@ def test_translator(self, backend) -> None:
parser = None

if parser:
sys.stderr.write("\nParsing the logs...\n")
sys.stderr.write(f"\n[{backend}] Parsing the logs...\n")
reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow")
reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json"))

original_workflow : Workflow = benchmark.workflow

_compare_workflows(original_workflow, reconstructed_workflow)

# Shutdown the container
# _shutdown_docker_container(container)
# Shutdown the container (weirdly, container is already shutdown by now... not sure how)
_shutdown_docker_container_and_remove_image(container)

3 changes: 2 additions & 1 deletion tests/wfbench/test_wfbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from tests.test_helpers import _create_fresh_local_dir
from tests.test_helpers import _start_docker_container
from tests.test_helpers import _shutdown_docker_container_and_remove_image
from tests.test_helpers import _remove_local_dir_if_it_exists
from tests.test_helpers import _get_total_size_of_directory
from tests.test_helpers import _compare_workflows
Expand Down Expand Up @@ -166,7 +167,7 @@ def test_create_from_recipe(self) -> None:
exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True)

# Kill the container
container.remove(force=True)
_shutdown_docker_container_and_remove_image(container)

# Inspect the data after execution
_actual_data_files_as_expected(dirpath, benchmark.workflow, data_spec)
Expand Down