Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,40 @@
from prefect import task, flow, get_run_logger
from prefect.blocks.system import Secret
from bluesky_tiled_plugins.writing.validator import validate
import time as ttime
from tiled.client import from_profile


@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, beamline_acronym):
@task
def check_stream(run):
logger = get_run_logger()
api_key = Secret.load("tiled-smi-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)
run = tiled_client[beamline_acronym]["raw"][uid]
logger.info(f"Validating uid {uid}")
start_time = ttime.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = run[stream].read()
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")


@task(retries=2, retry_delay_seconds=10)
def validate_local(run_client):
logger = get_run_logger()
validate(run_client, fix_errors=True, try_reading=True, raise_on_error=True)

@flow
def data_validation(uid):
read_all_streams(uid, beamline_acronym="smi")
def data_validation(uid, beamline_acronym="smi"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may still need to keep the old validation task that uses the /smi/raw catalog while the data is still being written to Mongo (and add the new validation of the /smi/migration catalog). It doesn't cost us much (correctly me if I'm wrong), but for completeness I think it makes sense to keep it.
We can the old function once Mongo is turned off and /migration is renamed /raw.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you want to keep the task that reads all data as a check for /smi/raw? we should talk about this more - this change would remove one of the major advantages of this PR, that we are not reading all of the data, which could be the main reason for prefect-worker2 CPU usage issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, if improving the performance was the main reason -- maybe just remove it then. I don't know if anyone paid any attention to this reading failing.
Validating /migration (what you've added) is important though; it fixes any inconsistencies in structures to make the data readable.

logger = get_run_logger()
api_key = Secret.load("tiled-smi-api-key", _sync=True).get()
tiled_client = from_profile("nsls2", api_key=api_key)
run_client = tiled_client[beamline_acronym]["migration"][uid]
run_client_raw = tiled_client[beamline_acronym]["raw"][uid]
logger.info(f"Launching tasks to check streams and validate uid {uid}")
start_time = ttime.monotonic()
check_stream_task = check_stream.submit(run_client_raw)
validate_task = validate_local.submit(run_client)
logger.info("Waiting for tasks to complete")
check_stream_task.result()
validate_task.result()
elapsed_time = ttime.monotonic() - start_time
logger.info(f"Finished checking and validating data; total {elapsed_time = }")
Loading