diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c6..595a3a4 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,9 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -25,6 +28,14 @@ class FlowParameterMapper: # From nersc.py "nersc_recon_flow/nersc_recon_flow": [ "file_path", + "config"], + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": [ + "file_path", + "num_nodes", + "config"], + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": [ + "file_path", + "num_nodes", "config"] } @@ -55,23 +66,39 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings( + alcf_recon: bool = False, + alcf_forge_recon_segment: bool = False, + nersc_recon: bool = False, + nersc_recon_multinode: bool = False, + nersc_forge_recon_segment: bool = False, + new_file_832: bool = True +) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow. + :param new_file_832: Boolean indicating whether to run the new 832 file processing flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"nersc_recon={nersc_recon}, " + f"nersc_recon_multinode={nersc_recon_multinode}, " + f"nersc_forge_recon_segment={nersc_forge_recon_segment}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -145,10 +172,27 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) + if decision_settings.get("nersc_recon_multinode_flow/nersc_recon_multinode_flow"): + nersc_multinode_params = FlowParameterMapper.get_flow_parameters( + "nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params) + tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params)) + + if decision_settings.get("nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow"): + nersc_forge_recon_segment_params = FlowParameterMapper.get_flow_parameters( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", available_params) + tasks.append(run_recon_flow_async( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", nersc_forge_recon_segment_params)) # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -169,7 +213,14 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings( + alcf_recon=True, + alcf_forge_recon_segment=False, + nersc_recon=True, + nersc_recon_multinode=False, + nersc_forge_recon_segment=False, + new_file_832=True + ) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC