diff --git a/tests/test_helpers.py b/tests/test_helpers.py index d3f70b0b..af549278 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -51,7 +51,7 @@ def _install_WfCommons_on_container(container): exit_code, output = container.exec_run("sudo python3 -m pip install . --break-system-packages", workdir="/tmp/WfCommons", stdout=True, stderr=True) if exit_code != 0: - raise RuntimeError("Failed to install WfCommons on the container"); + raise RuntimeError("Failed to install WfCommons on the container") def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command=None): if command is None: @@ -62,13 +62,13 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= try: image = client.images.get(image_name) - sys.stderr.write(f"Image '{image_name}' is available locally\n") + sys.stderr.write(f"[{backend}] Image '{image_name}' is available locally\n") except ImageNotFound: - sys.stderr.write(f"Pulling image '{image_name}'...\n") + sys.stderr.write(f"[{backend}] Pulling image '{image_name}'...\n") client.images.pull(image_name) # Launch the docker container to actually run the translated workflow - sys.stderr.write("Starting Docker container...\n") + sys.stderr.write(f"[{backend}] Starting Docker container...\n") container = client.containers.run( image=image_name, command=command, @@ -83,7 +83,7 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= # Copy over the wfbench and cpu-benchmark executables to where they should go on the container if bin_dir: - sys.stderr.write("Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Copying wfbench and cpu-benchmark...\n") exit_code, output = container.exec_run(["sh", "-c", "sudo cp -f `which wfbench` " + bin_dir], stdout=True, stderr=True) if exit_code != 0: @@ -93,13 +93,27 @@ def _start_docker_container(backend, mounted_dir, working_dir, bin_dir, command= if exit_code != 0: raise RuntimeError("Failed to copy cpu-benchmark executable to the bin directory") else: - sys.stderr.write("Not Copying wfbench and cpu-benchmark...\n") + sys.stderr.write(f"[{backend}] Not Copying wfbench and cpu-benchmark...\n") + container.backend = backend return container -def _shutdown_docker_container(container): - container.stop() - container.remove() +def _shutdown_docker_container_and_remove_image(container): + image = container.image + sys.stderr.write(f"[{container.backend}] Terminating container if need be...\n") + try: + container.stop() + container.remove() + except Exception as e: + pass + + # Remove the images as we go, if running on GitHub + if os.getenv('CI') or os.getenv('GITHUB_ACTIONS'): + sys.stderr.write(f"[{container.backend}] Removing Docker image...\n") + try: + image.remove(force=True) + except Exception as e: + sys.stderr.write(f"[{container.backend}] Warning: Error while removing image: {e}\n") def _get_total_size_of_directory(directory_path: str): total_size = 0 diff --git a/tests/translators_loggers/test_translators_loggers.py b/tests/translators_loggers/test_translators_loggers.py index da2059d1..7c094739 100644 --- a/tests/translators_loggers/test_translators_loggers.py +++ b/tests/translators_loggers/test_translators_loggers.py @@ -19,7 +19,7 @@ from tests.test_helpers import _create_fresh_local_dir from tests.test_helpers import _remove_local_dir_if_it_exists from tests.test_helpers import _start_docker_container -from tests.test_helpers import _shutdown_docker_container +from tests.test_helpers import _shutdown_docker_container_and_remove_image from tests.test_helpers import _compare_workflows from wfcommons import BlastRecipe @@ -116,8 +116,6 @@ def _additional_setup_swiftt(container): def run_workflow_dask(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./dask_workflow.py", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert (exit_code == 0) assert (output.decode().count("completed!") == num_tasks) @@ -126,8 +124,6 @@ def run_workflow_dask(container, num_tasks, str_dirpath): def run_workflow_parsl(container, num_tasks, str_dirpath): exit_code, output = container.exec_run("python ./parsl_workflow.py", stdout=True, stderr=True) ignored, output = container.exec_run(f"cat {str_dirpath}/runinfo/000/parsl.log", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert (exit_code == 0) assert ("completed" in output.decode()) @@ -137,8 +133,6 @@ def run_workflow_nextflow(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(f"nextflow run ./workflow.nf --pwd .", stdout=True, stderr=True) ignored, task_exit_codes = container.exec_run("find . -name .exitcode -exec cat {} \;", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert (exit_code == 0) assert (task_exit_codes.decode() == num_tasks * "0") @@ -149,9 +143,6 @@ def run_workflow_airflow(container, num_tasks, str_dirpath): exit_code, output = container.exec_run(cmd=["sh", "-c", "cd /home/wfcommons/ && sudo /bin/bash /run_a_workflow.sh Blast-Benchmark"], stdout=True, stderr=True) - # Kill the container - container.remove(force=True) - # Check sanity assert (exit_code == 0) assert (output.decode().count("completed") == num_tasks * 2) @@ -159,8 +150,6 @@ def run_workflow_airflow(container, num_tasks, str_dirpath): def run_workflow_bash(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert (exit_code == 0) assert (output.decode().count("completed") == num_tasks) @@ -168,9 +157,7 @@ def run_workflow_bash(container, num_tasks, str_dirpath): def run_workflow_taskvine(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd=["bash", "-c", "source ~/conda/etc/profile.d/conda.sh && conda activate && python3 ./taskvine_workflow.py"], stdout=True, stderr=True) - # Kill the container - container.remove(force=True) - # # Check sanity + # Check sanity assert (exit_code == 0) assert (output.decode().count("completed") == num_tasks) @@ -178,8 +165,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): # Run the workflow! # Note that the input file is hardcoded and Blast-specific exit_code, output = container.exec_run(cmd="cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 ", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert (exit_code == 0) # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files", @@ -189,8 +174,6 @@ def run_workflow_cwl(container, num_tasks, str_dirpath): def run_workflow_pegasus(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="bash /home/wfcommons/run_workflow.sh", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # Check sanity assert(exit_code == 0) assert("success" in output.decode()) @@ -198,8 +181,6 @@ def run_workflow_pegasus(container, num_tasks, str_dirpath): def run_workflow_swiftt(container, num_tasks, str_dirpath): # Run the workflow! exit_code, output = container.exec_run(cmd="swift-t workflow.swift", stdout=True, stderr=True) - # Kill the container - container.remove(force=True) # sys.stderr.write(output.decode()) # Check sanity assert(exit_code == 0) @@ -236,7 +217,7 @@ class TestTranslators: @pytest.mark.parametrize( "backend", [ - # "swiftt", + "swiftt", "dask", "parsl", "nextflow", @@ -259,22 +240,21 @@ def test_translator(self, backend) -> None: _remove_local_dir_if_it_exists(str_dirpath) # Perform the translation - sys.stderr.write("\nTranslating workflow...\n") + sys.stderr.write(f"\n[{backend}] Translating workflow...\n") translator = translator_classes[backend](benchmark.workflow) translator.translate(output_folder=dirpath) # Start the Docker container - sys.stderr.write("Starting Docker container...\n") container = _start_docker_container(backend, str_dirpath, str_dirpath, str_dirpath + "bin/") # Do whatever necessary setup additional_setup_methods[backend](container) # Run the workflow - sys.stderr.write("Running workflow...\n") + sys.stderr.write(f"[{backend}] Running workflow...\n") start_time = time.time() run_workflow_methods[backend](container, num_tasks, str_dirpath) - sys.stderr.write("Workflow ran in %.2f seconds\n" % (time.time() - start_time)) + sys.stderr.write(f"[{backend}] Workflow ran in %.2f seconds\n" % (time.time() - start_time)) # Run the log parser if any if backend == "pegasus": @@ -285,7 +265,7 @@ def test_translator(self, backend) -> None: parser = None if parser: - sys.stderr.write("\nParsing the logs...\n") + sys.stderr.write(f"\n[{backend}] Parsing the logs...\n") reconstructed_workflow : Workflow = parser.build_workflow("reconstructed_workflow") reconstructed_workflow.write_json(pathlib.Path("/tmp/reconstructed_workflow.json")) @@ -293,6 +273,6 @@ def test_translator(self, backend) -> None: _compare_workflows(original_workflow, reconstructed_workflow) - # Shutdown the container - # _shutdown_docker_container(container) + # Shutdown the container (weirdly, container is already shutdown by now... not sure how) + _shutdown_docker_container_and_remove_image(container) diff --git a/tests/wfbench/test_wfbench.py b/tests/wfbench/test_wfbench.py index 6b88b577..07593690 100644 --- a/tests/wfbench/test_wfbench.py +++ b/tests/wfbench/test_wfbench.py @@ -17,6 +17,7 @@ from tests.test_helpers import _create_fresh_local_dir from tests.test_helpers import _start_docker_container +from tests.test_helpers import _shutdown_docker_container_and_remove_image from tests.test_helpers import _remove_local_dir_if_it_exists from tests.test_helpers import _get_total_size_of_directory from tests.test_helpers import _compare_workflows @@ -166,7 +167,7 @@ def test_create_from_recipe(self) -> None: exit_code, output = container.exec_run(cmd="/bin/bash ./run_workflow.sh", stdout=True, stderr=True) # Kill the container - container.remove(force=True) + _shutdown_docker_container_and_remove_image(container) # Inspect the data after execution _actual_data_files_as_expected(dirpath, benchmark.workflow, data_spec)