1919from tests .test_helpers import _create_fresh_local_dir
2020from tests .test_helpers import _remove_local_dir_if_it_exists
2121from tests .test_helpers import _start_docker_container
22+ from tests .test_helpers import _shutdown_docker_container_and_remove_image
2223from tests .test_helpers import _compare_workflows
2324
2425from wfcommons import BlastRecipe
@@ -115,8 +116,6 @@ def _additional_setup_swiftt(container):
115116
116117def run_workflow_dask (container , num_tasks , str_dirpath ):
117118 exit_code , output = container .exec_run ("python ./dask_workflow.py" , stdout = True , stderr = True )
118- # Kill the container
119- container .remove (force = True )
120119 # Check sanity
121120 assert (exit_code == 0 )
122121 assert (output .decode ().count ("completed!" ) == num_tasks )
@@ -125,8 +124,6 @@ def run_workflow_dask(container, num_tasks, str_dirpath):
125124def run_workflow_parsl (container , num_tasks , str_dirpath ):
126125 exit_code , output = container .exec_run ("python ./parsl_workflow.py" , stdout = True , stderr = True )
127126 ignored , output = container .exec_run (f"cat { str_dirpath } /runinfo/000/parsl.log" , stdout = True , stderr = True )
128- # Kill the container
129- container .remove (force = True )
130127 # Check sanity
131128 assert (exit_code == 0 )
132129 assert ("completed" in output .decode ())
@@ -136,8 +133,6 @@ def run_workflow_nextflow(container, num_tasks, str_dirpath):
136133 # Run the workflow!
137134 exit_code , output = container .exec_run (f"nextflow run ./workflow.nf --pwd ." , stdout = True , stderr = True )
138135 ignored , task_exit_codes = container .exec_run ("find . -name .exitcode -exec cat {} \;" , stdout = True , stderr = True )
139- # Kill the container
140- container .remove (force = True )
141136 # Check sanity
142137 assert (exit_code == 0 )
143138 assert (task_exit_codes .decode () == num_tasks * "0" )
@@ -148,37 +143,28 @@ def run_workflow_airflow(container, num_tasks, str_dirpath):
148143 exit_code , output = container .exec_run (cmd = ["sh" , "-c" , "cd /home/wfcommons/ && sudo /bin/bash /run_a_workflow.sh Blast-Benchmark" ],
149144 stdout = True ,
150145 stderr = True )
151- # Kill the container
152- container .remove (force = True )
153-
154146 # Check sanity
155147 assert (exit_code == 0 )
156148 assert (output .decode ().count ("completed" ) == num_tasks * 2 )
157149
158150def run_workflow_bash (container , num_tasks , str_dirpath ):
159151 # Run the workflow!
160152 exit_code , output = container .exec_run (cmd = "/bin/bash ./run_workflow.sh" , stdout = True , stderr = True )
161- # Kill the container
162- container .remove (force = True )
163153 # Check sanity
164154 assert (exit_code == 0 )
165155 assert (output .decode ().count ("completed" ) == num_tasks )
166156
167157def run_workflow_taskvine (container , num_tasks , str_dirpath ):
168158 # Run the workflow!
169159 exit_code , output = container .exec_run (cmd = ["bash" , "-c" , "source ~/conda/etc/profile.d/conda.sh && conda activate && python3 ./taskvine_workflow.py" ], stdout = True , stderr = True )
170- # Kill the container
171- container .remove (force = True )
172- # # Check sanity
160+ # Check sanity
173161 assert (exit_code == 0 )
174162 assert (output .decode ().count ("completed" ) == num_tasks )
175163
176164def run_workflow_cwl (container , num_tasks , str_dirpath ):
177165 # Run the workflow!
178166 # Note that the input file is hardcoded and Blast-specific
179167 exit_code , output = container .exec_run (cmd = "cwltool ./main.cwl --split_fasta_00000001_input ./data/workflow_infile_0001 " , stdout = True , stderr = True )
180- # Kill the container
181- container .remove (force = True )
182168 # Check sanity
183169 assert (exit_code == 0 )
184170 # this below is ugly (the 3 is for "workflow", "compile_output_files" and "compile_log_files",
@@ -188,17 +174,13 @@ def run_workflow_cwl(container, num_tasks, str_dirpath):
188174def run_workflow_pegasus (container , num_tasks , str_dirpath ):
189175 # Run the workflow!
190176 exit_code , output = container .exec_run (cmd = "bash /home/wfcommons/run_workflow.sh" , stdout = True , stderr = True )
191- # Kill the container
192- container .remove (force = True )
193177 # Check sanity
194178 assert (exit_code == 0 )
195179 assert ("success" in output .decode ())
196180
197181def run_workflow_swiftt (container , num_tasks , str_dirpath ):
198182 # Run the workflow!
199183 exit_code , output = container .exec_run (cmd = "swift-t workflow.swift" , stdout = True , stderr = True )
200- # Kill the container
201- container .remove (force = True )
202184 # sys.stderr.write(output.decode())
203185 # Check sanity
204186 assert (exit_code == 0 )
@@ -235,6 +217,7 @@ class TestTranslators:
235217 @pytest .mark .parametrize (
236218 "backend" ,
237219 [
220+ "swiftt" ,
238221 "dask" ,
239222 "parsl" ,
240223 "nextflow" ,
@@ -243,53 +226,53 @@ class TestTranslators:
243226 "taskvine" ,
244227 "cwl" ,
245228 "pegasus" ,
246- "swiftt" ,
247229 ])
248230 @pytest .mark .unit
249231 # @pytest.mark.skip(reason="tmp")
250232 def test_translator (self , backend ) -> None :
251233 # Create workflow benchmark
252234 benchmark , num_tasks = _create_workflow_benchmark ()
253235
254-
255236 # Create a local translation directory
256237 str_dirpath = "/tmp/" + backend + "_translated_workflow/"
257238 dirpath = pathlib .Path (str_dirpath )
258239 # dirpath = _create_fresh_local_dir(str_dirpath)
259240 _remove_local_dir_if_it_exists (str_dirpath )
260241
261242 # Perform the translation
262- sys .stderr .write (" \n Translating workflow...\n " )
243+ sys .stderr .write (f" \n [ { backend } ] Translating workflow...\n " )
263244 translator = translator_classes [backend ](benchmark .workflow )
264245 translator .translate (output_folder = dirpath )
265246
266247 # Start the Docker container
267- sys .stderr .write ("Starting Docker container...\n " )
268248 container = _start_docker_container (backend , str_dirpath , str_dirpath , str_dirpath + "bin/" )
269249
270250 # Do whatever necessary setup
271251 additional_setup_methods [backend ](container )
272252
273253 # Run the workflow
274- sys .stderr .write (" Running workflow...\n " )
254+ sys .stderr .write (f"[ { backend } ] Running workflow...\n " )
275255 start_time = time .time ()
276256 run_workflow_methods [backend ](container , num_tasks , str_dirpath )
277- sys .stderr .write (" Workflow ran in %.2f seconds\n " % (time .time () - start_time ))
257+ sys .stderr .write (f"[ { backend } ] Workflow ran in %.2f seconds\n " % (time .time () - start_time ))
278258
279259 # Run the log parser if any
280260 if backend == "pegasus" :
281261 parser = PegasusLogsParser (dirpath / "work/wfcommons/pegasus/Blast-Benchmark/run0001/" )
282262 elif backend == "taskvine" :
283- parser = TaskVineLogsParser (dirpath / "vine-run-info/" , filenames_to_ignore = ["cpu-benchmark" ,"stress-ng" , "wfbench" ])
263+ parser = TaskVineLogsParser (dirpath / "vine-run-info/most-recent/vine-logs " , filenames_to_ignore = ["cpu-benchmark" ,"stress-ng" , "wfbench" ])
284264 else :
285265 parser = None
286266
287267 if parser :
288- sys .stderr .write (" \n Parsing the logs...\n " )
268+ sys .stderr .write (f" \n [ { backend } ] Parsing the logs...\n " )
289269 reconstructed_workflow : Workflow = parser .build_workflow ("reconstructed_workflow" )
290270 reconstructed_workflow .write_json (pathlib .Path ("/tmp/reconstructed_workflow.json" ))
291271
292272 original_workflow : Workflow = benchmark .workflow
293273
294274 _compare_workflows (original_workflow , reconstructed_workflow )
295275
276+ # Shutdown the container (weirdly, container is already shutdown by now... not sure how)
277+ _shutdown_docker_container_and_remove_image (container )
278+
0 commit comments