From 501eea223b540e6ed1986c332bde7bda3d384934 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Feb 2026 12:07:17 -0800 Subject: [PATCH 1/3] Updating dispatcher to include options for aclf-seg and nersc-seg forge flows --- orchestration/flows/bl832/dispatcher.py | 49 ++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index cf1d0c64..8964b441 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -17,6 +17,9 @@ class FlowParameterMapper: "alcf_recon_flow/alcf_recon_flow": [ "file_path", "config"], + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": [ + "file_path", + "config"], # From move.py "new_832_file_flow/new_file_832": [ "file_path", @@ -25,7 +28,12 @@ class FlowParameterMapper: # From nersc.py "nersc_recon_flow/nersc_recon_flow": [ "file_path", - "config"] + "config"], + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": [ + "file_path", + "num_nodes", + "config" + ] } @classmethod @@ -55,23 +63,36 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") -def setup_decision_settings(alcf_recon: bool, nersc_recon: bool, new_file_832: bool) -> dict: +def setup_decision_settings( + alcf_recon: bool, + alcf_forge_recon_segment: bool, + nersc_recon: bool, + nersc_recon_multinode: bool, + new_file_832: bool +) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. :param alcf_recon: Boolean indicating whether to run the ALCF reconstruction flow. + :param alcf_forge_recon_segment: Boolean indicating whether to run the ALCF Forge reconstruction segment flow. :param nersc_recon: Boolean indicating whether to run the NERSC reconstruction flow. - :param nersc_move: Boolean indicating whether to move files to NERSC. + :param nersc_recon_multinode: Boolean indicating whether to run the NERSC multinode reconstruction flow. + :param new_file_832: Boolean indicating whether to run the new 832 file processing flow. :return: A dictionary containing the settings for each flow. """ logger = get_run_logger() try: logger.info(f"Setting up decision settings: alcf_recon={alcf_recon}, " - f"nersc_recon={nersc_recon}, new_file_832={new_file_832}") + f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " + f"nersc_recon={nersc_recon}, " + f"nersc_recon_multinode={nersc_recon_multinode}, " + f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { "alcf_recon_flow/alcf_recon_flow": alcf_recon, + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, + "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -145,10 +166,22 @@ async def dispatcher( alcf_params = FlowParameterMapper.get_flow_parameters("alcf_recon_flow/alcf_recon_flow", available_params) tasks.append(run_recon_flow_async("alcf_recon_flow/alcf_recon_flow", alcf_params)) + if decision_settings.get("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow"): + alcf_forge_params = FlowParameterMapper.get_flow_parameters( + "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", + available_params + ) + tasks.append(run_recon_flow_async("alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow", alcf_forge_params)) + if decision_settings.get("nersc_recon_flow/nersc_recon_flow"): nersc_params = FlowParameterMapper.get_flow_parameters("nersc_recon_flow/nersc_recon_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_flow/nersc_recon_flow", nersc_params)) + if decision_settings.get("nersc_recon_multinode_flow/nersc_recon_multinode_flow"): + nersc_multinode_params = FlowParameterMapper.get_flow_parameters( + "nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params) + tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params)) + # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -169,7 +202,13 @@ async def dispatcher( """ try: # Setup decision settings based on input parameters - setup_decision_settings(alcf_recon=True, nersc_recon=True, new_file_832=True) + setup_decision_settings( + alcf_recon=True, + alcf_forge_recon_segment=False, + nersc_recon=True, + nersc_recon_multinode=False, + new_file_832=True + ) # Run the main decision flow with the specified parameters # asyncio.run(dispatcher( # config={}, # PYTEST, ALCF, NERSC From c5391d4e18e7367447ade1a50311f15c10928fc3 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 9 Feb 2026 12:09:44 -0800 Subject: [PATCH 2/3] Setting defaults for setup_decision_settings --- orchestration/flows/bl832/dispatcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 8964b441..5945f23b 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -64,11 +64,11 @@ class DecisionFlowInputModel(BaseModel): @task(name="setup_decision_settings") def setup_decision_settings( - alcf_recon: bool, - alcf_forge_recon_segment: bool, - nersc_recon: bool, - nersc_recon_multinode: bool, - new_file_832: bool + alcf_recon: bool = False, + alcf_forge_recon_segment: bool = False, + nersc_recon: bool = False, + nersc_recon_multinode: bool = False, + new_file_832: bool = True ) -> dict: """ This task is used to define the settings for the decision making process of the BL832 beamline. From ccbfb45ce33a99f60e68bb9af7c98ba9a3950772 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 10 Feb 2026 10:28:36 -0800 Subject: [PATCH 3/3] Making multinode recon and multinode recon+seg on nersc separate options --- orchestration/flows/bl832/dispatcher.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/dispatcher.py b/orchestration/flows/bl832/dispatcher.py index 5945f23b..595a3a43 100644 --- a/orchestration/flows/bl832/dispatcher.py +++ b/orchestration/flows/bl832/dispatcher.py @@ -32,8 +32,11 @@ class FlowParameterMapper: "nersc_recon_multinode_flow/nersc_recon_multinode_flow": [ "file_path", "num_nodes", - "config" - ] + "config"], + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": [ + "file_path", + "num_nodes", + "config"] } @classmethod @@ -68,6 +71,7 @@ def setup_decision_settings( alcf_forge_recon_segment: bool = False, nersc_recon: bool = False, nersc_recon_multinode: bool = False, + nersc_forge_recon_segment: bool = False, new_file_832: bool = True ) -> dict: """ @@ -86,6 +90,7 @@ def setup_decision_settings( f"alcf_forge_recon_segment={alcf_forge_recon_segment}, " f"nersc_recon={nersc_recon}, " f"nersc_recon_multinode={nersc_recon_multinode}, " + f"nersc_forge_recon_segment={nersc_forge_recon_segment}, " f"new_file_832={new_file_832}") # Define which flows to run based on the input settings settings = { @@ -93,6 +98,7 @@ def setup_decision_settings( "alcf_forge_recon_segment_flow/alcf_forge_recon_segment_flow": alcf_forge_recon_segment, "nersc_recon_flow/nersc_recon_flow": nersc_recon, "nersc_recon_multinode_flow/nersc_recon_multinode_flow": nersc_recon_multinode, + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow": nersc_forge_recon_segment, "new_832_file_flow/new_file_832": new_file_832 } # Save the settings in a JSON block for later retrieval by other flows @@ -182,6 +188,11 @@ async def dispatcher( "nersc_recon_multinode_flow/nersc_recon_multinode_flow", available_params) tasks.append(run_recon_flow_async("nersc_recon_multinode_flow/nersc_recon_multinode_flow", nersc_multinode_params)) + if decision_settings.get("nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow"): + nersc_forge_recon_segment_params = FlowParameterMapper.get_flow_parameters( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", available_params) + tasks.append(run_recon_flow_async( + "nersc_forge_recon_segment_flow/nersc_forge_recon_segment_flow", nersc_forge_recon_segment_params)) # Run ALCF and NERSC flows in parallel, if any if tasks: try: @@ -207,6 +218,7 @@ async def dispatcher( alcf_forge_recon_segment=False, nersc_recon=True, nersc_recon_multinode=False, + nersc_forge_recon_segment=False, new_file_832=True ) # Run the main decision flow with the specified parameters