Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,42 @@


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym):
def read_run(client, uid, dry_run=False):
logger = get_run_logger()
if dry_run:
logger.info(f"Dry run: not reading {uid}")
return None
run = client["tst"]["raw"][uid]
logger.info(f"Validating uid {run.start['uid']}")
return run


@task(retries=2, retry_delay_seconds=10)
def read_stream(run, stream):
return run[stream].read()


@flow
def read_all_streams(uid, beamline_acronym, dry_run=False):
logger = get_run_logger()
api_key = Secret.load("tiled-tst-api-key").get()
cl = from_profile("nsls2", api_key=api_key)
run = cl["tst"]["raw"][uid]
logger.info(f"Validating uid {run.start['uid']}")
run = read_run(cl, uid, dry_run)
start_time = ttime.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = run[stream].read()
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
if dry_run:
logger.info(f"Dry run: not reading streams from uid {uid}")
else:
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = read_stream(run, stream) # noqa: F841
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@flow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird to call a flow (read_all_streams) from within a flow (data_validation) when everything is related to doing data validation. Maybe we can consider moving whatever is in read_all_streams into data_validation so we only have one flow?

def data_validation(uid):
read_all_streams(uid, beamline_acronym="tst")
def data_validation(uid, dry_run=False):
read_all_streams(uid, beamline_acronym="tst", dry_run=dry_run)
14 changes: 7 additions & 7 deletions end_of_run_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@


@task
def log_completion():
def log_completion(dry_run=False):
logger = get_run_logger()
logger.info("Complete")
logger.info(f"Complete! dry_run:{dry_run}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info(f"Complete! dry_run:{dry_run}")
logger.info(f"Complete! dry_run: {dry_run}")



@flow
def end_of_run_workflow(stop_doc):
def end_of_run_workflow(stop_doc, dry_run=False):
uid = stop_doc["run_start"]
# hello_world()
data_validation(uid, return_state=True)
get_other_docs(uid)
# long_flow(iterations=100, sleep_length=10)
log_completion()
data_validation(uid, return_state=True, dry_run=dry_run)
get_other_docs(uid, dry_run=dry_run)
# long_flow(iterations=100, sleep_length=10, dry_run=dry_run)
log_completion(dry_run=dry_run)
Loading