diff --git a/data_validation.py b/data_validation.py index 60ad578..a5e1ae6 100644 --- a/data_validation.py +++ b/data_validation.py @@ -5,24 +5,42 @@ @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym): +def read_run(client, uid, dry_run=False): + logger = get_run_logger() + if dry_run: + logger.info(f"Dry run: not reading {uid}") + return None + run = client["tst"]["raw"][uid] + logger.info(f"Validating uid {run.start['uid']}") + return run + + +@task(retries=2, retry_delay_seconds=10) +def read_stream(run, stream): + return run[stream].read() + + +@flow +def read_all_streams(uid, beamline_acronym, dry_run=False): logger = get_run_logger() api_key = Secret.load("tiled-tst-api-key").get() cl = from_profile("nsls2", api_key=api_key) - run = cl["tst"]["raw"][uid] - logger.info(f"Validating uid {run.start['uid']}") + run = read_run(cl, uid, dry_run) start_time = ttime.monotonic() - for stream in run: - logger.info(f"{stream}:") - stream_start_time = ttime.monotonic() - stream_data = run[stream].read() - stream_elapsed_time = ttime.monotonic() - stream_start_time - logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") - logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") + if dry_run: + logger.info(f"Dry run: not reading streams from uid {uid}") + else: + for stream in run: + logger.info(f"{stream}:") + stream_start_time = ttime.monotonic() + stream_data = read_stream(run, stream) # noqa: F841 + stream_elapsed_time = ttime.monotonic() - stream_start_time + logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") + logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") elapsed_time = ttime.monotonic() - start_time logger.info(f"{elapsed_time = }") @flow -def data_validation(uid): - read_all_streams(uid, beamline_acronym="tst") +def data_validation(uid, dry_run=False): + read_all_streams(uid, beamline_acronym="tst", dry_run=dry_run) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index f56291b..60e8deb 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -5,16 +5,16 @@ @task -def log_completion(): +def log_completion(dry_run=False): logger = get_run_logger() - logger.info("Complete") + logger.info(f"Complete! dry_run:{dry_run}") @flow -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, dry_run=False): uid = stop_doc["run_start"] # hello_world() - data_validation(uid, return_state=True) - get_other_docs(uid) - # long_flow(iterations=100, sleep_length=10) - log_completion() + data_validation(uid, return_state=True, dry_run=dry_run) + get_other_docs(uid, dry_run=dry_run) + # long_flow(iterations=100, sleep_length=10, dry_run=dry_run) + log_completion(dry_run=dry_run)