From 8d21e25b562031a00795d0d6e9702560d97a51ee Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 20 Feb 2026 14:26:03 -0500 Subject: [PATCH 1/3] refactor to use dry_run, move Tiled jobs into tasks --- data_validation.py | 42 ++++++++++++++++++++++++++++++------------ end_of_run_workflow.py | 14 +++++++------- 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/data_validation.py b/data_validation.py index 60ad578..70ecc94 100644 --- a/data_validation.py +++ b/data_validation.py @@ -5,24 +5,42 @@ @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym): +def read_run(client, uid, dry_run=dry_run): + logger = get_run_logger() + if dry_run: + logger.info(f"Dry run: not reading {uid}") + return None + run = client["tst"]["raw"][uid] + logger.info(f"Validating uid {run.start['uid']}") + return run + + +@task(retries=2, retry_delay_seconds=10) +def read_stream(run, stream): + stream_data = run[stream].read() + + +@flow +def read_all_streams(uid, beamline_acronym, dry_run=dry_run): logger = get_run_logger() api_key = Secret.load("tiled-tst-api-key").get() cl = from_profile("nsls2", api_key=api_key) - run = cl["tst"]["raw"][uid] - logger.info(f"Validating uid {run.start['uid']}") + run = read_run(cl, uid, dry_run) start_time = ttime.monotonic() - for stream in run: - logger.info(f"{stream}:") - stream_start_time = ttime.monotonic() - stream_data = run[stream].read() - stream_elapsed_time = ttime.monotonic() - stream_start_time - logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") - logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") + if dry_run: + logger.info(f"Dry run: not reading streams from uid {uid}") + else: + for stream in run: + logger.info(f"{stream}:") + stream_start_time = ttime.monotonic() + stream_data = read_stream(run, stream) + stream_elapsed_time = ttime.monotonic() - stream_start_time + logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") + logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") elapsed_time = ttime.monotonic() - start_time logger.info(f"{elapsed_time = }") @flow -def data_validation(uid): - read_all_streams(uid, beamline_acronym="tst") +def data_validation(uid, dry_run=dry_run): + read_all_streams(uid, beamline_acronym="tst", dry_run=dry_run) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index f56291b..391dc66 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -5,16 +5,16 @@ @task -def log_completion(): +def log_completion(dry_run=dry_run): logger = get_run_logger() - logger.info("Complete") + logger.info(f"Complete! dry_run:{dry_run}") @flow -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, dry_run=False): uid = stop_doc["run_start"] # hello_world() - data_validation(uid, return_state=True) - get_other_docs(uid) - # long_flow(iterations=100, sleep_length=10) - log_completion() + data_validation(uid, return_state=True, dry_run=dry_run) + get_other_docs(uid, dry_run=dry_run) + # long_flow(iterations=100, sleep_length=10, dry_run=dry_run) + log_completion(dry_run=dry_run) From c89e75b91324c2c95e2c3029af24c880e8268720 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 20 Feb 2026 14:48:50 -0500 Subject: [PATCH 2/3] fix pre-commit issues * return data from read function --- data_validation.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/data_validation.py b/data_validation.py index 70ecc94..a5e1ae6 100644 --- a/data_validation.py +++ b/data_validation.py @@ -5,23 +5,23 @@ @task(retries=2, retry_delay_seconds=10) -def read_run(client, uid, dry_run=dry_run): +def read_run(client, uid, dry_run=False): logger = get_run_logger() if dry_run: logger.info(f"Dry run: not reading {uid}") return None - run = client["tst"]["raw"][uid] + run = client["tst"]["raw"][uid] logger.info(f"Validating uid {run.start['uid']}") return run @task(retries=2, retry_delay_seconds=10) def read_stream(run, stream): - stream_data = run[stream].read() + return run[stream].read() @flow -def read_all_streams(uid, beamline_acronym, dry_run=dry_run): +def read_all_streams(uid, beamline_acronym, dry_run=False): logger = get_run_logger() api_key = Secret.load("tiled-tst-api-key").get() cl = from_profile("nsls2", api_key=api_key) @@ -33,7 +33,7 @@ def read_all_streams(uid, beamline_acronym, dry_run=dry_run): for stream in run: logger.info(f"{stream}:") stream_start_time = ttime.monotonic() - stream_data = read_stream(run, stream) + stream_data = read_stream(run, stream) # noqa: F841 stream_elapsed_time = ttime.monotonic() - stream_start_time logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") @@ -42,5 +42,5 @@ def read_all_streams(uid, beamline_acronym, dry_run=dry_run): @flow -def data_validation(uid, dry_run=dry_run): +def data_validation(uid, dry_run=False): read_all_streams(uid, beamline_acronym="tst", dry_run=dry_run) From 86f12632e0f3019c2005aa603fced60fde70efb4 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 20 Feb 2026 14:50:55 -0500 Subject: [PATCH 3/3] fix pre-commit issues --- end_of_run_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 391dc66..60e8deb 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -5,7 +5,7 @@ @task -def log_completion(dry_run=dry_run): +def log_completion(dry_run=False): logger = get_run_logger() logger.info(f"Complete! dry_run:{dry_run}")