From b6350df5ddea84e8365841728ffb7e14ce5eb165 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Wed, 8 Nov 2023 23:36:54 -0500 Subject: [PATCH 01/37] implemented timepix end-run workflow --- processing.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/processing.py b/processing.py index a3cc853..4d213db 100644 --- a/processing.py +++ b/processing.py @@ -6,6 +6,27 @@ tiled_cilent_sandbox = tiled_client["sandbox"] tiled_cilent_processed = tiled_client["processed"] +@task +def get_df_uncent(run): + sid = run['start']['scan_id'] + raw_file_paths = extract_fpaths_from_sid(sid) + for file in raw_file_paths: + if (os.path.exists(file)): + yield raw_to_sorted_df(file) + +@flow +def insert_to_tiled(container, run): + structure = None + node = None + num_img = run['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + + for partition_num, df in enumerate(get_df_uncent(run)): + if (structure == None): + structure = TableStructure.from_pandas(df) + structure.npartitions = num_img + node = container.new("table", structure=structure, metadata={"raw": run['start']['uid']}) + + node.write_partition(df, partition_num) @task def process_run(ref): @@ -21,11 +42,15 @@ def process_run(ref): logger = get_run_logger() # Grab the BlueSky run run = tiled_client_chx[ref] + # Grab the full uid for logging purposes full_uid = run.start["uid"] logger.info(f"{full_uid = }") logger.info("Do something with this uid") - # Do some additional processing or call otehr python processing functions + + client = from_uri('https://tiled.nsls2.bnl.gov') + container = client['chx']['processed'] + insert_to_tiled(container, run) @flow @@ -38,5 +63,7 @@ def processing_flow(ref): ref : int, str reference to BlueSky. It can be scan_id, uid or index """ - - process_run(ref) + run = tiled_client_chx[ref] + + if (run['start']['detectors'][0] == 'tpx3'): + process_run(ref) From 73221d719adf675ed18b20117ea02e3c2a038b5d Mon Sep 17 00:00:00 2001 From: JGoodrichBNL <103213447+JGoodrichBNL@users.noreply.github.com> Date: Mon, 13 Nov 2023 13:29:24 -0500 Subject: [PATCH 02/37] Update processing.py Fixed missing imports from tpx3awkward --- processing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/processing.py b/processing.py index 4d213db..0a913ed 100644 --- a/processing.py +++ b/processing.py @@ -1,5 +1,6 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile +from tpx3awkward import extract_fpaths_from_sid, raw_to_sorted_df tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] From c90a3de220bc94b759fdde0c301bc6c8e3399ee1 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 16 Nov 2023 15:30:47 -0500 Subject: [PATCH 03/37] made Abby's changes --- processing.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/processing.py b/processing.py index 4d213db..40913aa 100644 --- a/processing.py +++ b/processing.py @@ -6,7 +6,6 @@ tiled_cilent_sandbox = tiled_client["sandbox"] tiled_cilent_processed = tiled_client["processed"] -@task def get_df_uncent(run): sid = run['start']['scan_id'] raw_file_paths = extract_fpaths_from_sid(sid) @@ -14,7 +13,6 @@ def get_df_uncent(run): if (os.path.exists(file)): yield raw_to_sorted_df(file) -@flow def insert_to_tiled(container, run): structure = None node = None @@ -48,9 +46,7 @@ def process_run(ref): logger.info(f"{full_uid = }") logger.info("Do something with this uid") - client = from_uri('https://tiled.nsls2.bnl.gov') - container = client['chx']['processed'] - insert_to_tiled(container, run) + insert_to_tiled(tiled_client_processed, run) @flow From 231c1a47c20042ea32d64209296a9c0b5c0333f9 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 16 Nov 2023 18:06:51 -0500 Subject: [PATCH 04/37] typo fix --- processing.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/processing.py b/processing.py index 526a0a7..67aad9e 100644 --- a/processing.py +++ b/processing.py @@ -1,11 +1,10 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile -from tpx3awkward import extract_fpaths_from_sid, raw_to_sorted_df tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] -tiled_cilent_sandbox = tiled_client["sandbox"] -tiled_cilent_processed = tiled_client["processed"] +tiled_client_sandbox = tiled_client["sandbox"] +tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): sid = run['start']['scan_id'] From 5b927265840714b812e7ca26228879b55ee264d0 Mon Sep 17 00:00:00 2001 From: JGoodrichBNL <103213447+JGoodrichBNL@users.noreply.github.com> Date: Fri, 17 Nov 2023 11:54:35 -0500 Subject: [PATCH 05/37] Update processing.py --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 67aad9e..e00179e 100644 --- a/processing.py +++ b/processing.py @@ -61,5 +61,5 @@ def processing_flow(ref): """ run = tiled_client_chx[ref] - if (run['start']['detectors'][0] == 'tpx3'): + if (run.start['detectors'][0] == 'tpx3'): process_run(ref) From cebc6fbe661f13638d5c74b439e467e826d8c955 Mon Sep 17 00:00:00 2001 From: JGoodrichBNL <103213447+JGoodrichBNL@users.noreply.github.com> Date: Fri, 17 Nov 2023 14:47:50 -0500 Subject: [PATCH 06/37] Update processing.py --- processing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing.py b/processing.py index e00179e..52e1ff9 100644 --- a/processing.py +++ b/processing.py @@ -1,5 +1,6 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile +from tpx3awkward import extract_fpaths_from_sid, raw_to_sorted_df tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] @@ -16,7 +17,7 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): From f33d1577134f2e4e0f7759a8ae9ac1389b0a9bd9 Mon Sep 17 00:00:00 2001 From: JGoodrichBNL <103213447+JGoodrichBNL@users.noreply.github.com> Date: Fri, 17 Nov 2023 14:53:37 -0500 Subject: [PATCH 07/37] Update processing.py --- processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index 52e1ff9..3dcdd02 100644 --- a/processing.py +++ b/processing.py @@ -8,7 +8,7 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - sid = run['start']['scan_id'] + sid = run.start['scan_id'] raw_file_paths = extract_fpaths_from_sid(sid) for file in raw_file_paths: if (os.path.exists(file)): @@ -23,7 +23,7 @@ def insert_to_tiled(container, run): if (structure == None): structure = TableStructure.from_pandas(df) structure.npartitions = num_img - node = container.new("table", structure=structure, metadata={"raw": run['start']['uid']}) + node = container.new("table", structure=structure, metadata={"raw": run.start['uid']}) node.write_partition(df, partition_num) From 798ac44c4fe31bba69532b2e9e1f83de0d6e8e9f Mon Sep 17 00:00:00 2001 From: Justin Goodrich Date: Tue, 21 Nov 2023 15:14:08 -0500 Subject: [PATCH 08/37] Added tpx3awkward utils. --- processing.py | 4 +- tpx3utils.py | 894 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 896 insertions(+), 2 deletions(-) create mode 100644 tpx3utils.py diff --git a/processing.py b/processing.py index 3dcdd02..af6c74e 100644 --- a/processing.py +++ b/processing.py @@ -1,6 +1,6 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile -from tpx3awkward import extract_fpaths_from_sid, raw_to_sorted_df +from tpx3utils import raw_to_sorted_df tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] @@ -9,7 +9,7 @@ def get_df_uncent(run): sid = run.start['scan_id'] - raw_file_paths = extract_fpaths_from_sid(sid) + raw_file_paths = list(run.table()["tpx3_files_raw_filepaths"].to_numpy()[0]) for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) diff --git a/tpx3utils.py b/tpx3utils.py new file mode 100644 index 0000000..01bdccc --- /dev/null +++ b/tpx3utils.py @@ -0,0 +1,894 @@ +import os +import numpy as np +from pathlib import Path +from numpy.typing import NDArray +from typing import TypeVar, Union, Dict, Set, List +import numba +import pandas as pd +from scipy.spatial import KDTree +import concurrent.futures +import multiprocessing +import time +from tqdm import tqdm +#from pyCHX.chx_packages import db, get_sid_filenames +import gc + +IA = NDArray[np.uint64] +UnSigned = TypeVar("UnSigned", IA, np.uint64) + + +def raw_as_numpy(fpath: Union[str, Path]) -> IA: + """ + Read raw tpx3 data file as a numpy array. + + Each entry is read as a uint8 (64bit unsigned-integer) + + Parameters + ---------- + + """ + with open(fpath, "rb") as fin: + return np.frombuffer(fin.read(), dtype=" UnSigned: + return v >> np.uint64(shift) & np.uint64(2**width - 1) + + +@numba.jit(nopython=True) +def is_packet_header(v: UnSigned) -> UnSigned: + return get_block(v, 32, 0) == 861425748 + + +@numba.jit(nopython=True) +def classify_array(data: IA) -> NDArray[np.uint8]: + """ + Create an array the same size as the data classifying 64bit uint by type. + + 0: an unknown type (!!) + 1: packet header (id'd via TPX3 magic number) + 2: photon event (id'd via 0xB upper nibble) + 3: TDC timstamp (id'd via 0x6 upper nibble) + 4: global timestap (id'd via 0x4 upper nibble) + 5: "command" data (id'd via 0x7 upper nibble) + 6: frame driven data (id'd via 0xA upper nibble) (??) + """ + output = np.zeros_like(data, dtype="> np.uint(60) + # probably a better way to do this, but brute force! + output[~is_header & (nibble == 0xB)] = 2 + output[~is_header & (nibble == 0x6)] = 3 + output[~is_header & (nibble == 0x4)] = 4 + output[~is_header & (nibble == 0x7)] = 5 + output[~is_header & (nibble == 0xA)] = 6 + + return output + + +@numba.jit(nopython=True) +def _shift_xy(chip, row, col): + # TODO sort out if this needs to be paremeterized + out = np.zeros(2, "u4") + if chip == 0: + out[0] = row + out[1] = col + np.uint(256) + elif chip == 1: + out[0] = np.uint(511) - row + out[1] = np.uint(511) - col + elif chip == 2: + out[0] = np.uint(511) - row + out[1] = np.uint(255) - col + elif chip == 3: + out[0] = row + out[1] = col + else: + # TODO sort out how to get the chip number in here and make numba happy + raise RuntimeError("Unknown chip id") + return out + + +@numba.jit(nopython=True) +def _ingest_raw_data(data: IA): + types = np.zeros_like(data, dtype="> np.uint(60) + # probably a better way to do this, but brute force! + types[~is_header & (nibble == 0xB)] = 2 + types[~is_header & (nibble == 0x6)] = 3 + types[~is_header & (nibble == 0x4)] = 4 + types[~is_header & (nibble == 0x7)] = 5 + + # sort out how many photons we have + total_photons = np.sum(types == 2) + + # allocate the return arrays + x = np.zeros(total_photons, dtype="u2") + y = np.zeros(total_photons, dtype="u2") + pix_addr = np.zeros(total_photons, dtype="u2") + ToA = np.zeros(total_photons, dtype="u2") + ToT = np.zeros(total_photons, dtype="u4") + FToA = np.zeros(total_photons, dtype="u2") + SPIDR = np.zeros(total_photons, dtype="u2") + chip_number = np.zeros(total_photons, dtype="u1") + basetime = np.zeros(total_photons, dtype="u8") + timestamp = np.zeros(total_photons, dtype="u8") + + photon_offset = 0 + chip = np.uint16(0) + expected_msg_count = np.uint16(0) + msg_run_count = np.uint(0) + + heartbeat_lsb = np.uint64(0) + heartbeat_msb = np.uint64(0) + heartbeat_time = np.uint64(0) + # loop over the packet headers (can not vectorize this with numpy) + for j in range(len(data)): + msg = data[j] + typ = types[j] + if typ == 1: + # 1: packet header (id'd via TPX3 magic number) + if expected_msg_count != msg_run_count: + print("missing messages!", msg) + # extract scalar information from the header + + # "number of pixels in chunk" is given in bytes not words + # and means all words in the chunk, not just "photons" + expected_msg_count = get_block(msg, 16, 48) // 8 + # what chip we are on + chip = np.uint8(get_block(msg, 8, 32)) + msg_run_count = 0 + elif typ == 2 or typ == 6: + # 2: photon event (id'd via 0xB upper nibble) + # 6: frame driven data (id'd via 0xA upper nibble) (??) + + # | + + # pixAddr is 16 bits + # these names and math are adapted from c++ code + l_pix_addr = pix_addr[photon_offset] = (msg >> np.uint(44)) & np.uint(0xFFFF) + # This is laid out 16ibts which are 2 interleaved 8 bit unsigned ints + # CCCCCCCRRRRRRCRR + # |dcol ||spix|^|| + # | 7 || 6 |1|2 + # + # The high 7 bits of the column + # '1111111000000000' + dcol = (l_pix_addr & np.uint(0xFE00)) >> np.uint(8) + # The high 6 bits of the row + # '0000000111111000' + spix = (l_pix_addr & np.uint(0x01F8)) >> np.uint(1) + rowcol = _shift_xy( + chip, + # add the low 2 bits of the row + # '0000000000000011' + spix + (l_pix_addr & np.uint(0x3)), + # add the low 1 bit of the column + # '0000000000000100' + dcol + ((l_pix_addr & np.uint(0x4)) >> np.uint(2)), + ) + col = x[photon_offset] = rowcol[1] + y[photon_offset] = rowcol[0] + # ToA is 14 bits + ToA[photon_offset] = (msg >> np.uint(30)) & np.uint(0x3FFF) + # ToT is 10 bits + # report in ns + ToT[photon_offset] = ((msg >> np.uint(20)) & np.uint(0x3FF)) * 25 + # FToA is 4 bits + l_FToA = FToA[photon_offset] = (msg >> np.uint(16)) & np.uint(0xF) + # SPIDR time is 16 bits + SPIDR[photon_offset] = msg & np.uint(0xFFFF) + # chip number (this is a constant) + chip_number[photon_offset] = chip + # heartbeat time + basetime[photon_offset] = heartbeat_time + + ToA_coarse = (SPIDR[photon_offset] << np.uint(14)) | ToA[photon_offset] + pixelbits = int((ToA_coarse >> np.uint(28)) & np.uint(0x3)) + heartbeat_time_bits = int((heartbeat_time >> np.uint(28)) & np.uint(0x3)) + diff = heartbeat_time_bits - pixelbits + if diff == 1 or diff == -3: + heartbeat_time -= np.uint(0x10000000) + elif diff == -1 or diff == 3: + heartbeat_time += np.uint(0x10000000) + globaltime = (heartbeat_time & np.uint(0xFFFFC0000000)) | (ToA_coarse & np.uint(0x3FFFFFFF)) + + timestamp[photon_offset] = (globaltime << np.uint(12)) - (l_FToA << np.uint(8)) + # correct for phase shift + phase = np.uint((col / 2) % 16) + if phase == 0: + timestamp[photon_offset] += 16 << 8 + else: + timestamp[photon_offset] += phase << 8 + + photon_offset += 1 + msg_run_count += 1 + elif typ == 3: + # 3: TDC timstamp (id'd via 0x6 upper nibble) + # TODO: handle these! + msg_run_count += 1 + elif typ == 4: + # 4: global timestap (id'd via 0x4 upper nibble) + subheader = (msg >> np.uint(56)) & np.uint(0x0F) + if subheader == 0x4: + # timer lsb, 32 bits of time + heartbeat_lsb = (msg >> np.uint(16)) & np.uint(0xFFFFFFFF) + elif subheader == 0x5: + # timer msb + + time_msg = (msg >> np.uint(16)) & np.uint(0xFFFF) + heartbeat_msb = time_msg << np.uint(32) + # TODO the c++ code has large jump detection, do not understand why + heartbeat_time = heartbeat_msb | heartbeat_lsb + else: + raise Exception("unknown header") + + msg_run_count += 1 + elif typ == 5: + # 5: "command" data (id'd via 0x7 upper nibble) + # TODO handle this! + msg_run_count += 1 + else: + raise Exception("Not supported") + + return x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number, basetime, timestamp + + +def ingest_raw_data(data: IA) -> Dict[str, NDArray]: + """ + Parse values out of raw timepix3 data stream. + + Parameters + ---------- + data : NDArray[np.unint64] + The stream of raw data from the timepix3 + + Returns + ------- + Dict[str, NDArray] + Keys of x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number + """ + return { + k.strip(): v + for k, v in zip( + "x, y, pix_addr, ToA, ToT, FToA, SPIDR, chip_number, basetime, timestamp".split(","), + _ingest_raw_data(data), + ) + } + +# ^-- tom wrote +# v-- justin wrote +""" +Some basic functions that help take the initial output of ingest_raw_data and finish the processing. +""" + + +def raw_to_sorted_df(fpath: Union[str, Path]) -> pd.DataFrame: + """ + Parses a .tpx3 file and returns the raw data after timesorting. + + Parameters + ---------- + fpath: Union[str, Path] + The path to the .tpx3 data to be processed. + + Returns + ------- + pd.DataFrame + DataFrame of raw events from the .tpx3 file. + """ + raw_df = pd.DataFrame(ingest_raw_data(raw_as_numpy(fpath))) + return raw_df.sort_values("timestamp").reset_index(drop=True) + + +def condense_raw_df(df: pd.DataFrame) -> pd.DataFrame: + """ + Condenses the raw dataframe with only key information necesary for the analysis. Returns a dataframe with timestamp (renamed to t), x, y, and ToT. + + Parameters + ---------- + df : pd.DataFrame + DataFrame generated using raw_to_sorted_df(). + + Returns + ------- + pd.DataFrame + Dataframe condensed to only contain pertinent information for analysis. + """ + cdf = df[["timestamp", "x", "y", "ToT"]] + cdf = cdf.rename( + columns={"timestamp": "t"} + ) # obviously not necessary, just easier to type 't' a lot than 'timestamp' + return cdf + + +def drop_zero_tot(df: pd.DataFrame) -> pd.DataFrame: + """ + Removes events which don't have positive ToT. Necessary step before clustering. + + Parameters + ---------- + df : pd.DataFrame + DataFrame to have ToT filtered. + + Returns + ------- + pd.DataFrame + df with only the events with ToT > 0 + """ + fdf = df[df["ToT"] > 0] + return fdf + + +""" +Functions to help perform clustering and centroiding on raw data. +""" +TIMESTAMP_VALUE = ((1e-9) / 4096) * 25 +MICROSECOND = 10 ** (-6) + +# We have had decent success with these values, but do not know for sure if they are optimal. +DEFAULT_CLUSTER_RADIUS = 2 +DEFAULT_CLUSTER_TW_MICROSECONDS = 0.5 + +DEFAULT_CLUSTER_TW = int(DEFAULT_CLUSTER_TW_MICROSECONDS * MICROSECOND / TIMESTAMP_VALUE) + + +def neighbor_set_from_df( + df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS +) -> tuple[np.ndarray, Set[tuple[int]]]: + """ + Uses scipy.spatial's KDTree to cluster raw input data. Requires a time window for clustering adjacent pixels and the total search radius. + + Parameters + ---------- + df : pd.DataFrame + DataFrame with the raw data (after timesorting and ToT filtering). + tw : int + The time window to be considered "coincident" for clustering purposes + radius : int + The search radius, using Euclidean distance of x, y, timestamp/tw + + Returns + ------- + np.ndarray + Numpy representation of the raw events being used in the clustering. + Set[tuple[int]] + An set of tuples of the indices of the clustered events. The outer set is each cluster, and the inner tuples are the events in each cluster. + """ + events = np.array( + df[["t", "x", "y", "ToT", "t"]].values + ) # first three columns are for search radius of KDTree + events[:, 0] = np.floor_divide(events[:, 0], tw) # bin by the time window + tree = KDTree(events[:, :3]) # generate KDTree based off the coordinates + neighbors = tree.query_ball_tree( + tree, radius + ) # compare tree against itself to find neighbors within the search radius + clusters = set(tuple(n) for n in neighbors) # turn the list of lists into a set of tuples + return events, clusters + + +def cluster_stats( + clusters: Set[tuple[int]] +) -> tuple[int]: + """ + Determines basic information about cluster information, such as the number of clusters and size of the largest cluster. + + Parameters + ---------- + clusters : Set[tuple[int]] + The set of tuples of clusters from neighbor_set_from_df() + + Returns + ------- + int + The total number of clusters + int + The number of events in the largest cluster + """ + num_clusters = len(clusters) + max_cluster = max(map(len, clusters)) + return num_clusters, max_cluster + + +def create_cluster_arr( + clusters: Set[tuple[int]], num_clusters: int, max_cluster: int +) -> np.ndarray: # is there a better way to do this? + """ + Converts the clusters from a set of tuples of indices to an 2D numpy array format which can be efficiently iterated through with Numba. + + Parameters + ---------- + clusters : Set[tuple[int]] + The set of tuples of clusters from neighbor_set_from_df() + num_clusters : int + The total number of clusters + max_cluster : int + The number of events in the largest cluster + + Returns + ------- + np.ndarray + The cluster data now in a 2D numpy array. + """ + cluster_arr = np.full( + (num_clusters, max_cluster), -1, dtype=np.int64 + ) # fill with -1; these will be passed later + for cluster_num, cluster in enumerate(clusters): + for event_num, event in enumerate(cluster): + cluster_arr[cluster_num, event_num] = event + return cluster_arr + + +@numba.jit(nopython=True) +def cluster_arr_to_cent( + cluster_arr: np.ndarray, events: np.ndarray, num_clusters: int, max_cluster: int +) -> tuple[np.ndarray]: + """ + Performs the centroiding of a group of clusters using Numba. Note I originally attempted to unpack the clusters using list comprehensions, but this approach is significantly faster. + + Parameters + ---------- + clusters : Set[tuple[int]] + The set of tuples of clusters from neighbor_set_from_df() + num_clusters : int + The total number of clusters + max_cluster : int + The number of events in the largest clust + + Returns + ------- + tuple[np.ndarray] + t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. + """ + t = np.zeros(num_clusters, dtype="uint64") + xc = np.zeros(num_clusters, dtype="float32") + yc = np.zeros(num_clusters, dtype="float32") + ToT_max = np.zeros(num_clusters, dtype="uint16") + ToT_sum = np.zeros(num_clusters, dtype="uint16") + n = np.zeros(num_clusters, dtype="ubyte") + + for cluster_id in range(num_clusters): + _ToT_max = np.ushort(0) + for event_num in range(max_cluster): + event = cluster_arr[cluster_id, event_num] + if event > -1: # if we have an event here + if events[event, 3] > _ToT_max: # find the max ToT, assign, use that time + _ToT_max = events[event, 3] + t[cluster_id] = events[event, 4] + ToT_max[cluster_id] = _ToT_max + xc[cluster_id] += events[event, 1] * events[event, 3] # x and y centroids by time over threshold + yc[cluster_id] += events[event, 2] * events[event, 3] + ToT_sum[cluster_id] += events[event, 3] # calcuate sum + n[cluster_id] += np.ubyte(1) # number of events in cluster + else: + break + xc[cluster_id] /= ToT_sum[cluster_id] # normalize + yc[cluster_id] /= ToT_sum[cluster_id] + + return t, xc, yc, ToT_max, ToT_sum, n + + +def ingest_cent_data( + data: np.ndarray +) -> Dict[str, np.ndarray]: + """ + Performs the centroiding of a group of clusters. + + Parameters + ---------- + data : np.ndarray + The stream of cluster data from cluster_arr_to_cent() + + Returns + ------- + Dict[str, np.ndarray] + Keys of t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. + """ + return { + k.strip(): v + for k, v in zip( + "t, xc, yc, ToT_max, ToT_sum, n".split(","), + data, + ) + } + + +def cent_to_numpy( + cluster_arr: np.ndarray, events: int, num_clusters: int, max_cluster: int +) -> Dict[str, np.ndarray]: + """ + Wrapper function to perform ingest_cent_data(cluster_arr_to_cent()) + + Parameters + ---------- + cluster_arr : np.ndarray + The array of cluster events from create_cluster_arr() + events : int + Number of photon events + num_clusters : int + The total number of clusters + max_cluster : int + The number of events in the largest clust + + Returns + ------- + Dict[str, np.ndarray] + Keys of t, xc, yc, ToT_max, ToT_sum, and n (number of events) in each cluster. + """ + return ingest_cent_data(cluster_arr_to_cent(cluster_arr, events, num_clusters, max_cluster)) + + +def cent_to_df( + cd_np: Dict[str, np.ndarray] +) -> pd.DataFrame: + """ + Returns the centroided dataframe from the zipped inputs. + + Parameters + ---------- + cd_np : Dict[str, np.ndarray] + Dictionary of the clustered data. + + Returns + ------- + pd.DataFrame + Time sorted dataframe of the centroids. + """ + cent_df = pd.DataFrame(cd_np) + return cent_df.sort_values("t").reset_index(drop=True) + + +def raw_df_to_cluster_df( + raw_df: pd.DataFrame, tw: int = DEFAULT_CLUSTER_TW, radius: int = DEFAULT_CLUSTER_RADIUS +) -> pd.DataFrame: + """ + Uses functions defined herein to take Dataframe of raw data and return dataframe of clustered data. + + Parameters + ---------- + raw_df : pd.DataFrame + Pandas DataFrame of the raw data + tw : int + The time window to be considered "coincident" for clustering purposes + radius : int + The search radius, using Euclidean distance of x, y, timestamp/tw + + Returns + ------- + pd.DataFrame + Pandas DataFrame of the centroided data. + """ + filt_cond_raw_df = drop_zero_tot(condense_raw_df(raw_df)) + events, clusters = neighbor_set_from_df(filt_cond_raw_df, tw, radius) + num_clusters, max_cluster = cluster_stats(clusters) + cluster_arr = create_cluster_arr(clusters, num_clusters, max_cluster) + return cent_to_df(cent_to_numpy(cluster_arr, events, num_clusters, max_cluster)) + + +def add_centroid_cols( + df: pd.DataFrame, gap: bool = True +) -> pd.DataFrame: + """ + Calculates centroid positions to the nearest pixel and the timestamp in nanoseconds. + + Parameters + ---------- + df : pd.DataFrame + Input centroided dataframe + gap : bool = True + Determines whether to implement large gap correction by adding 2 empty pixels offsets + + Returns + ------- + pd.DataFrame + Originally dataframe with new columns x, y, and t_ns added. + """ + if gap: + df.loc[df['xc'] >= 255.5, 'xc'] += 2 + df.loc[df['yc'] >= 255.5, 'yc'] += 2 + df["x"] = np.round(df["xc"]).astype(np.uint16) + df["y"] = np.round(df["yc"]).astype(np.uint16) + df["t_ns"] = df["t"] / 4096 * 25 + + return df + + +""" +A bunch of functions to help process multiple related .tpx3 files into Pandas dataframes stored in .h5 files. +""" +RAW_H5_SUFFIX = "" +CENT_H5_SUFFIX = "_cent" +CONCAT_H5_SUFFIX = "_cent" + + +def extract_fpaths_from_sid( + sid: int +) -> List[str]: + """ + Extract file paths from a given sid. + + Parameters + ---------- + sid : int + Short ID of a BlueSky scan + + Returns + ------- + List[str] + Filepaths of the written .tpx3, as recorded in Tiled + """ + return list(db[sid].table()["tpx3_files_raw_filepaths"].to_numpy()[0]) + + +def extract_uid_from_fpaths( + fpaths: List[str] +) -> str: + """ + Extract scan unique ID from file paths. + + Parameters + ---------- + fpaths : List[str] + List of the filepaths. + + Returns + ------- + str + String of the first file's unique ID. + + """ + return os.path.basename(fpaths[0])[:23] + + +def extract_dir_from_fpaths( + fpaths: List[str] +) -> str: + """ + Extract directory from file paths. + + Parameters + ---------- + fpaths : List[str] + List of the filepaths. + + Returns + ------- + str + String of the first file's directory. + + """ + return os.path.dirname(fpaths[0]) + + +def extract_uid_from_sid( + sid: int +) -> str: + """ + Extract user ID from a given sid. + + Parameters + ---------- + sid : int + + Returns + ------- + str + String of the short ID's corresponding unique ID. + + """ + return extract_uid_from_fpaths(extract_fpaths_from_sid(sid)) + + +def convert_file( + fpath: Union[str, Path], time_window_microsecond: float = DEFAULT_CLUSTER_TW_MICROSECONDS, radius: int = DEFAULT_CLUSTER_RADIUS, print_details: bool = False +): + """ + Convert a .tpx3 file into raw and centroided Pandas dataframes, which are stored in .h5 files. + + Parameters + ---------- + fpath : Union[str, Path] + .tpx3 file path + time_window_microsecond : float = DEFAULT_CLUSTER_TW_MICROSECONDS + The time window, in microseconds, to perform centroiding + radius : int = DEFAULT_CLUSTER_RADIUS + The radius, in pixels, to perform centroiding + print_details : bool = False + Boolean toggle about whether to print detailed data. + """ + fname, ext = os.path.splitext(fpath) + dfname = "{}{}.h5".format(fname, RAW_H5_SUFFIX) + dfcname = "{}{}.h5".format(fname, CONCAT_H5_SUFFIX) + + if ext == ".tpx3" and os.path.exists(fpath): + file_size = os.path.getsize(fpath) + have_df = os.path.exists(dfname) + have_dfc = os.path.exists(dfcname) + + if have_df and have_dfc: + print("-> {} exists, skipping.".format(dfname)) + else: + print("-> Processing {}, size: {:.1f} MB".format(fpath, file_size/1000000)) + time_window = time_window_microsecond * 1e-6 + time_stamp_conversion = 6.1e-12 + timedif = int(time_window / time_stamp_conversion) + + if print_details: + print("Loading {} data into dataframe...".format(fpath)) + df = raw_to_sorted_df(fpath) + num_events = df.shape[0] + + if print_details: + print("Loading {} complete. {} events found. Saving to: {}".format(fpath, num_events, dfname)) + df.to_hdf(dfname, key='df', mode='w') + + if print_details: + print("Saving {} complete. Beginning clustering...".format(dfname)) + df_c = raw_df_to_cluster_df(df, timedif, radius) + num_clusters = df_c.shape[0] + + if print_details: + print("Clustering {} complete. {} clusters found. Saving to {}".format(fpath, num_clusters, dfcname)) + df_c.to_hdf(dfcname, key='df', mode='w') + print("Saving {} complete. Moving onto next file.".format(dfcname)) + else: + print("File not found. Moving onto next file.") + + +def convert_tpx3_parallel( + fpaths: Union[str, Path], num_workers: int = None +): + """ + Convert a list of .tpx3 files in a parallel processing pool. + + Parameters + ---------- + fpaths : Union[str, Path] + .tpx3 file paths to convert in a parallel processing pool. + num_workers : int = None + Number of parallel workers to employ. + """ + if num_workers == None: + num_cores = multiprocessing.cpu_count() + max_workers = num_cores-1 + else: + max_workers = num_workers + + with multiprocessing.Pool(processes=max_workers) as pool: + pool.map(convert_file, fpaths) + + print("Parallel conversion complete") + + +def convert_tpx3_st(fpaths: Union[str, Path]): + """ + Convert a list of .tpx3 files in a single thread. + + Parameters + ---------- + fpaths : Union[str, Path] + .tpx3 file paths to convert in a single thread. + """ + for file in fpaths: + convert_file(file) + + +def get_cent_files( + uid: str, dir_name: Union[str, Path] +) -> List[str]: + """ + Gets a list of the centroided .h5 files from a given uid, sorted by sequence number. + + Parameters + ---------- + uid : str + The unique ID of the scan of which we want to get the files. + + dir_name : Union[str, path] + Directory to look in for the files. + + Returns + ------- + List[str] + List of the centroided file paths. + """ + cent_files = [ + os.path.join(dir_name, file) + for file in os.listdir(dir_name) + if file.endswith("{}.h5".format(CENT_H5_SUFFIX)) and str(uid) in file and len(os.path.basename(file)) == 44 + ] + + cent_files.sort(key=lambda f: int(os.path.splitext(os.path.basename(f))[0].split("_")[-2])) + return cent_files + + +def concat_cent_files( + cfpaths: List[Union[str, Path]] +): + """ + Concatenates several centroided files together. + + Parameters + ---------- + cfpaths : List[str, Path] + List of the centroided .h5 files to concatenate together. + """ + dir_name = os.path.dirname(cfpaths[0]) + uid = extract_uid_from_fpaths(cfpaths) + + dfs = [pd.read_hdf(fpath, key='df') for fpath in tqdm(cfpaths)] + combined_df = pd.concat(dfs).reset_index(drop=True) + + save_path = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) + combined_df.to_hdf(save_path, key='df', mode='w') + + print("-> Saving complete.") + + +def get_con_cent_file( + sid: int +) -> str: + """ + Gets the location of the concatenated centroid files of a given sid. + + Parameters + ---------- + sid : int + Short ID of whichto get the centroided file path + + Returns + ------- + str + Path of the centroided file. + """ + fpaths = extract_fpaths_from_sid(sid) + uid = extract_uid_from_fpaths(fpaths) + dir_name = extract_dir_from_fpaths(fpaths) + cfpath = os.path.join(dir_name, "{}{}.h5".format(uid, CONCAT_H5_SUFFIX)) + + if os.path.exists(cfpath): + return cfpath + else: + print("-> Warning: {} does not exist".format(cfpath)) + return None + + +def convert_sids( + sids: List[int] +): + """ + Convert given sids by converting each .tpx3 file and then concatenating results together into a master dataframe. + + Parameters + ---------- + sids : List[int] + List of BlueSky scans' short IDs to convert. + """ + + for sid in sids: + print("\n\n---> Beginning sid: {} <---\n".format(sid)) + + tpx3fpaths = extract_fpaths_from_sid(sid) + dir_name = extract_dir_from_fpaths(tpx3fpaths) + num_tpx = len(tpx3fpaths) + uid = extract_uid_from_fpaths(tpx3fpaths) + + convert_tpx3_parallel(tpx3fpaths, num_workers=16) + centfpaths = get_cent_files(uid, dir_name) + num_cent = len(centfpaths) + + if num_tpx == num_cent: + print("---> Conversion numbers match") + concat_cent_files(centfpaths) + else: + print("---> Warning: conversion mismatch: tpx3={}, cent={}".format(num_tpx, num_cent)) + + print("---> Done with {}!".format(sid)) + gc.collect() From 91b34b10fdd49c36813fb36c19a8b5871c5d8513 Mon Sep 17 00:00:00 2001 From: Justin Goodrich Date: Tue, 21 Nov 2023 16:24:45 -0500 Subject: [PATCH 09/37] Fixing filepaths --- processing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index af6c74e..06ad16c 100644 --- a/processing.py +++ b/processing.py @@ -9,7 +9,7 @@ def get_df_uncent(run): sid = run.start['scan_id'] - raw_file_paths = list(run.table()["tpx3_files_raw_filepaths"].to_numpy()[0]) + raw_file_paths = list(run['primary']['data']['tpx3_files_raw_filepaths'].compute()[0]) for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) @@ -47,7 +47,7 @@ def process_run(ref): logger.info(f"{full_uid = }") logger.info("Do something with this uid") - insert_to_tiled(tiled_client_processed, run) + insert_to_tiled(tiled_client_sandbox, run) @flow From 94b51b86593bf11beb2fea559fd21ff0835b2541 Mon Sep 17 00:00:00 2001 From: Justin Goodrich Date: Tue, 21 Nov 2023 16:26:16 -0500 Subject: [PATCH 10/37] Fixing filepaths --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 06ad16c..ea38f7e 100644 --- a/processing.py +++ b/processing.py @@ -9,7 +9,7 @@ def get_df_uncent(run): sid = run.start['scan_id'] - raw_file_paths = list(run['primary']['data']['tpx3_files_raw_filepaths'].compute()[0]) + raw_file_paths = list(run['primary']['data']['tpx3_files_raw_filepaths'][0]) for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) From 212b4711d8a89f2ebcaaa22a78e27aead8ef0344 Mon Sep 17 00:00:00 2001 From: JGoodrichBNL <103213447+JGoodrichBNL@users.noreply.github.com> Date: Tue, 28 Nov 2023 13:14:59 -0500 Subject: [PATCH 11/37] Update processing.py added import of os --- processing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/processing.py b/processing.py index ea38f7e..ca77abd 100644 --- a/processing.py +++ b/processing.py @@ -1,6 +1,7 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile from tpx3utils import raw_to_sorted_df +import os tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] From 628f165e3956432fc21d4b58ddc4465e0b1475fd Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Wed, 29 Nov 2023 22:44:38 -0500 Subject: [PATCH 12/37] imported Tiled TableStructure --- processing.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/processing.py b/processing.py index ca77abd..9fd5686 100644 --- a/processing.py +++ b/processing.py @@ -1,7 +1,6 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile -from tpx3utils import raw_to_sorted_df -import os +from tiled.structures.table import TableStructure tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] @@ -9,8 +8,8 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - sid = run.start['scan_id'] - raw_file_paths = list(run['primary']['data']['tpx3_files_raw_filepaths'][0]) + sid = run['start']['scan_id'] + raw_file_paths = extract_fpaths_from_sid(sid) for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) @@ -18,13 +17,13 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): structure = TableStructure.from_pandas(df) structure.npartitions = num_img - node = container.new("table", structure=structure, metadata={"raw": run.start['uid']}) + node = container.new("table", structure=structure, metadata={"raw": run['start']['uid']}) node.write_partition(df, partition_num) @@ -48,7 +47,7 @@ def process_run(ref): logger.info(f"{full_uid = }") logger.info("Do something with this uid") - insert_to_tiled(tiled_client_sandbox, run) + insert_to_tiled(tiled_client_processed, run) @flow @@ -63,5 +62,5 @@ def processing_flow(ref): """ run = tiled_client_chx[ref] - if (run.start['detectors'][0] == 'tpx3'): + if (run['start']['detectors'][0] == 'tpx3'): process_run(ref) From 4482339ed505264b9700f766d66686d4dc95ac2b Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Wed, 29 Nov 2023 23:47:24 -0500 Subject: [PATCH 13/37] commented out tpx3 check --- processing.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index 9fd5686..0f47d18 100644 --- a/processing.py +++ b/processing.py @@ -62,5 +62,6 @@ def processing_flow(ref): """ run = tiled_client_chx[ref] - if (run['start']['detectors'][0] == 'tpx3'): - process_run(ref) + process_run(ref) + # if (run['start']['detectors'][0] == 'tpx3'): + # process_run(ref) From 51a89524f2238f72a3542e7e7e8448747bbadd62 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 00:02:58 -0500 Subject: [PATCH 14/37] modified detector check --- processing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/processing.py b/processing.py index 0f47d18..4cc61cf 100644 --- a/processing.py +++ b/processing.py @@ -62,6 +62,6 @@ def processing_flow(ref): """ run = tiled_client_chx[ref] - process_run(ref) - # if (run['start']['detectors'][0] == 'tpx3'): - # process_run(ref) + # process_run(ref) + if (run.start['detectors'][0] == 'tpx3'): + process_run(ref) From e0d148de523c2c03e05d8af2b45c37498991d7fc Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 00:08:52 -0500 Subject: [PATCH 15/37] edited run object query --- processing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processing.py b/processing.py index 4cc61cf..3c254ab 100644 --- a/processing.py +++ b/processing.py @@ -17,7 +17,7 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run.descriptors[0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): @@ -62,6 +62,5 @@ def processing_flow(ref): """ run = tiled_client_chx[ref] - # process_run(ref) if (run.start['detectors'][0] == 'tpx3'): process_run(ref) From e298a803214913e2675044339096c95fda47bd91 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 00:19:46 -0500 Subject: [PATCH 16/37] modified run object query --- processing.py | 2 +- tpx3utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index 3c254ab..d1984cf 100644 --- a/processing.py +++ b/processing.py @@ -17,7 +17,7 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run.descriptors[0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run.metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): diff --git a/tpx3utils.py b/tpx3utils.py index 01bdccc..5e24788 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -10,7 +10,7 @@ import multiprocessing import time from tqdm import tqdm -#from pyCHX.chx_packages import db, get_sid_filenames +# from pyCHX.chx_packages import db, get_sid_filenames import gc IA = NDArray[np.uint64] From b1c17bf2acaff19fd895486846db70a4071d2e66 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 17:39:28 -0500 Subject: [PATCH 17/37] modified run object query pt3 --- processing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/processing.py b/processing.py index d1984cf..b493a0b 100644 --- a/processing.py +++ b/processing.py @@ -17,7 +17,8 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run.metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + # num_img = run.metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): From 1fa593802279033c14acdc5afb6ec9b6b7183e1c Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 17:42:11 -0500 Subject: [PATCH 18/37] modified run object query pt4 --- processing.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/processing.py b/processing.py index b493a0b..04a169e 100644 --- a/processing.py +++ b/processing.py @@ -8,7 +8,7 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - sid = run['start']['scan_id'] + sid = run.start['scan_id'] raw_file_paths = extract_fpaths_from_sid(sid) for file in raw_file_paths: if (os.path.exists(file)): @@ -17,7 +17,6 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - # num_img = run.metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): From 335063da4236cfa1974e363c528536d12e32b981 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 17:47:04 -0500 Subject: [PATCH 19/37] imported tpx3utils function --- processing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/processing.py b/processing.py index 04a169e..f8d021a 100644 --- a/processing.py +++ b/processing.py @@ -1,6 +1,7 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile from tiled.structures.table import TableStructure +from tpx3utils import extract_fpaths_from_sid tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] From a152d509c6db586670f79c56fe5e418a985162a9 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 19:47:10 -0500 Subject: [PATCH 20/37] made extract_fpaths function take in a run object --- processing.py | 4 ++-- tpx3utils.py | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/processing.py b/processing.py index f8d021a..d3788c0 100644 --- a/processing.py +++ b/processing.py @@ -9,8 +9,8 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - sid = run.start['scan_id'] - raw_file_paths = extract_fpaths_from_sid(sid) + # sid = run.start['scan_id'] + raw_file_paths = extract_fpaths_from_sid(run) for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) diff --git a/tpx3utils.py b/tpx3utils.py index 5e24788..c0323f2 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -608,9 +608,7 @@ def add_centroid_cols( CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid( - sid: int -) -> List[str]: +def extract_fpaths_from_sid(run) """ Extract file paths from a given sid. @@ -624,7 +622,7 @@ def extract_fpaths_from_sid( List[str] Filepaths of the written .tpx3, as recorded in Tiled """ - return list(db[sid].table()["tpx3_files_raw_filepaths"].to_numpy()[0]) + return list(run.table()["tpx3_files_raw_filepaths"].to_numpy()[0]) def extract_uid_from_fpaths( From 8ce4ec62ed85c482c51bc1bfe277b86da0839e15 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 19:53:14 -0500 Subject: [PATCH 21/37] typo fix --- tpx3utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tpx3utils.py b/tpx3utils.py index c0323f2..d7ddcdf 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -608,7 +608,7 @@ def add_centroid_cols( CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid(run) +def extract_fpaths_from_sid(run): """ Extract file paths from a given sid. From e3a698a29a7951bef039478e45102d2e2e69a955 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Thu, 30 Nov 2023 20:19:20 -0500 Subject: [PATCH 22/37] added log message --- processing.py | 4 +++- tpx3utils.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index d3788c0..53f5068 100644 --- a/processing.py +++ b/processing.py @@ -38,7 +38,9 @@ def process_run(ref): ref : int, str reference to BlueSky. It can be scan_id, uid or index """ - + + print("REFERENCE: ") + print(ref) logger = get_run_logger() # Grab the BlueSky run run = tiled_client_chx[ref] diff --git a/tpx3utils.py b/tpx3utils.py index d7ddcdf..52ab3a7 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -622,7 +622,7 @@ def extract_fpaths_from_sid(run): List[str] Filepaths of the written .tpx3, as recorded in Tiled """ - return list(run.table()["tpx3_files_raw_filepaths"].to_numpy()[0]) + return list(run["tpx3_files_raw_filepaths"].to_numpy()[0]) def extract_uid_from_fpaths( From 6279059f6fd8edf1b848bc9cae561fa10190d69d Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:09:59 -0500 Subject: [PATCH 23/37] changed query for gathering raw filepaths --- processing.py | 5 ++--- tpx3utils.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/processing.py b/processing.py index 53f5068..b97f903 100644 --- a/processing.py +++ b/processing.py @@ -9,8 +9,7 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - # sid = run.start['scan_id'] - raw_file_paths = extract_fpaths_from_sid(run) + raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'].compute()[0] for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) @@ -18,7 +17,7 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run['primary'].metadata['descrsiptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): diff --git a/tpx3utils.py b/tpx3utils.py index 52ab3a7..254291e 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -608,7 +608,7 @@ def add_centroid_cols( CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid(run): +def extract_fpaths_from_sid(run) """ Extract file paths from a given sid. @@ -622,7 +622,7 @@ def extract_fpaths_from_sid(run): List[str] Filepaths of the written .tpx3, as recorded in Tiled """ - return list(run["tpx3_files_raw_filepaths"].to_numpy()[0]) + return list(db[sid].table()["tpx3_files_raw_filepaths"].to_numpy()[0]) def extract_uid_from_fpaths( From 775fa7e2857950cab70359bac84b7d763ac06b65 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:15:16 -0500 Subject: [PATCH 24/37] fixed syntax error --- tpx3utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tpx3utils.py b/tpx3utils.py index 254291e..c0e7c70 100644 --- a/tpx3utils.py +++ b/tpx3utils.py @@ -608,7 +608,7 @@ def add_centroid_cols( CONCAT_H5_SUFFIX = "_cent" -def extract_fpaths_from_sid(run) +def extract_fpaths_from_sid(run): """ Extract file paths from a given sid. From e596ef09c7651f4e8a5f25f6d5fe69c897d08570 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:18:23 -0500 Subject: [PATCH 25/37] commented out sandbox tiled connection --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index b97f903..445347b 100644 --- a/processing.py +++ b/processing.py @@ -5,7 +5,7 @@ tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] -tiled_client_sandbox = tiled_client["sandbox"] +# tiled_client_sandbox = tiled_client["sandbox"] tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): From a6b482d7796c1b084902dff3b37cb91774e3b66d Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:26:09 -0500 Subject: [PATCH 26/37] fixed syntax error --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 445347b..1261761 100644 --- a/processing.py +++ b/processing.py @@ -17,7 +17,7 @@ def get_df_uncent(run): def insert_to_tiled(container, run): structure = None node = None - num_img = run['primary'].metadata['descrsiptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): From 5f8d3f59975cb4cceaa4c5fff8ec79187e00f258 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:28:41 -0500 Subject: [PATCH 27/37] removed compute() attribute for file path gathering --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 1261761..a360cd9 100644 --- a/processing.py +++ b/processing.py @@ -9,7 +9,7 @@ tiled_client_processed = tiled_client["processed"] def get_df_uncent(run): - raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'].compute()[0] + raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'][0] for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) From a8234729b5d7ab23b3abdfaa44a01f8be1925083 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:30:49 -0500 Subject: [PATCH 28/37] imported os --- processing.py | 1 + 1 file changed, 1 insertion(+) diff --git a/processing.py b/processing.py index a360cd9..5245568 100644 --- a/processing.py +++ b/processing.py @@ -2,6 +2,7 @@ from tiled.client import from_profile from tiled.structures.table import TableStructure from tpx3utils import extract_fpaths_from_sid +import os tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] From e3c406f8d213c074e85566ca381c2cd2c2c67045 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:32:37 -0500 Subject: [PATCH 29/37] imported function from tpx3util --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 5245568..de0bcd1 100644 --- a/processing.py +++ b/processing.py @@ -1,7 +1,7 @@ from prefect import flow, task, get_run_logger from tiled.client import from_profile from tiled.structures.table import TableStructure -from tpx3utils import extract_fpaths_from_sid +from tpx3utils import extract_fpaths_from_sid, raw_to_sorted_df import os tiled_client = from_profile("nsls2")["chx"] From ecfd18e758357dca9fef55fa5cad766cc32515c6 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 12:36:05 -0500 Subject: [PATCH 30/37] fixed run object query --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index de0bcd1..9c6901f 100644 --- a/processing.py +++ b/processing.py @@ -24,7 +24,7 @@ def insert_to_tiled(container, run): if (structure == None): structure = TableStructure.from_pandas(df) structure.npartitions = num_img - node = container.new("table", structure=structure, metadata={"raw": run['start']['uid']}) + node = container.new("table", structure=structure, metadata={"raw": run.start['uid']}) node.write_partition(df, partition_num) From 6d16c290bd5ceaff4513b54d94d877541c013cb1 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 13:17:05 -0500 Subject: [PATCH 31/37] added sid to processed metadata --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 9c6901f..89873c0 100644 --- a/processing.py +++ b/processing.py @@ -24,7 +24,7 @@ def insert_to_tiled(container, run): if (structure == None): structure = TableStructure.from_pandas(df) structure.npartitions = num_img - node = container.new("table", structure=structure, metadata={"raw": run.start['uid']}) + node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) node.write_partition(df, partition_num) From 80260ff88bdc0c32faf222bae07aa432c627286b Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 14:09:32 -0500 Subject: [PATCH 32/37] split container into raw and cent --- processing.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/processing.py b/processing.py index 89873c0..50c550e 100644 --- a/processing.py +++ b/processing.py @@ -19,14 +19,26 @@ def insert_to_tiled(container, run): structure = None node = None num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] + + node = container.create(key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + raw_node = None + cent_node = None for partition_num, df in enumerate(get_df_uncent(run)): if (structure == None): structure = TableStructure.from_pandas(df) structure.npartitions = num_img - node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + # structure2 = TableStructure.from_pandas(df) + # structure2.npartitions = num_img + # structs = [structure1, structure2] + cent_node = node.new("table", structure=structure, key="cent") + raw_node = node.new("table", structure=structure, key="raw") + # node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + + raw_node.write_partition(df, partition_num) + cent_node.write_partition(df, partition_num) - node.write_partition(df, partition_num) + @task def process_run(ref): From 6e70db1fa8d57669f3d0ee95712fd578bc0425ed Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 14:11:01 -0500 Subject: [PATCH 33/37] syntax fix --- processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 50c550e..9bbe122 100644 --- a/processing.py +++ b/processing.py @@ -20,7 +20,7 @@ def insert_to_tiled(container, run): node = None num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] - node = container.create(key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + node = container.create_container(key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) raw_node = None cent_node = None From 6edc55f0e84b9db11eb7c8127e8a8cf275a5bdfb Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 15:10:50 -0500 Subject: [PATCH 34/37] added multiprocessing --- processing.py | 46 ++++++++++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/processing.py b/processing.py index 9bbe122..9bcc05a 100644 --- a/processing.py +++ b/processing.py @@ -14,30 +14,40 @@ def get_df_uncent(run): for file in raw_file_paths: if (os.path.exists(file)): yield raw_to_sorted_df(file) + +def process_file(args): + file_path = args[0] + partition_num = args[1] + + if (os.path.exists(file)): + df = raw_to_sorted_df(file) + node.write_partition(df, partition_num) + + def insert_to_tiled(container, run): - structure = None - node = None num_img = run['primary'].metadata['descriptors'][0]['configuration']['tpx3']['data']['tpx3_cam_num_images'] - - node = container.create_container(key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) - raw_node = None - cent_node = None + raw_file_paths = run['primary']['data']['tpx3_files_raw_filepaths'][0] - for partition_num, df in enumerate(get_df_uncent(run)): - if (structure == None): - structure = TableStructure.from_pandas(df) - structure.npartitions = num_img - # structure2 = TableStructure.from_pandas(df) - # structure2.npartitions = num_img - # structs = [structure1, structure2] - cent_node = node.new("table", structure=structure, key="cent") - raw_node = node.new("table", structure=structure, key="raw") - # node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + struct_df = raw_to_sorted_df(raw_file_paths[0]) + structure = TableStructure.from_pandas(struct_df) + structure.npartitions = num_img + global node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + + args = [] + for i in range(0, len(raw_file_paths)): + args.append([raw_file_paths[i], i]) + + with multiprocessing.Pool(processes=max_workers) as pool: + pool.map(process_file, args) - raw_node.write_partition(df, partition_num) - cent_node.write_partition(df, partition_num) +# for partition_num, df in enumerate(get_df_uncent(run)): +# if (structure == None): +# structure = TableStructure.from_pandas(df) +# structure.npartitions = num_img +# node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) +# node.write_partition(df, partition_num) @task From e6a6385885cb05f904ea1d3e385b01033be4ff70 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 15:18:06 -0500 Subject: [PATCH 35/37] modified global variable node --- processing.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processing.py b/processing.py index 9bcc05a..ff59633 100644 --- a/processing.py +++ b/processing.py @@ -4,6 +4,8 @@ from tpx3utils import extract_fpaths_from_sid, raw_to_sorted_df import os +node = None + tiled_client = from_profile("nsls2")["chx"] tiled_client_chx = tiled_client["raw"] # tiled_client_sandbox = tiled_client["sandbox"] @@ -32,7 +34,7 @@ def insert_to_tiled(container, run): struct_df = raw_to_sorted_df(raw_file_paths[0]) structure = TableStructure.from_pandas(struct_df) structure.npartitions = num_img - global node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) + node = container.new("table", structure=structure, key=run.start['uid'], metadata={"raw_uid": run.start['uid'], "raw_sid": run.start['scan_id']}) args = [] for i in range(0, len(raw_file_paths)): From 27c22ccde775ec4b769187b0a0f06ba747233797 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 15:21:38 -0500 Subject: [PATCH 36/37] imported multiprocessing --- processing.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/processing.py b/processing.py index ff59633..9c2158d 100644 --- a/processing.py +++ b/processing.py @@ -3,6 +3,7 @@ from tiled.structures.table import TableStructure from tpx3utils import extract_fpaths_from_sid, raw_to_sorted_df import os +import multiprocessing node = None @@ -40,6 +41,13 @@ def insert_to_tiled(container, run): for i in range(0, len(raw_file_paths)): args.append([raw_file_paths[i], i]) + if num_workers == False: + num_cores = multiprocessing.cpu_count() + max_workers = num_cores-1 + else: + max_workers = num_workers + + with multiprocessing.Pool(processes=max_workers) as pool: pool.map(process_file, args) From 06b5fafb0c5e72c06ea952cf47840b8ba22ddae1 Mon Sep 17 00:00:00 2001 From: Joseph Hanrahan Date: Tue, 5 Dec 2023 15:25:09 -0500 Subject: [PATCH 37/37] number of workers fix --- processing.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/processing.py b/processing.py index 9c2158d..c082d0f 100644 --- a/processing.py +++ b/processing.py @@ -41,13 +41,9 @@ def insert_to_tiled(container, run): for i in range(0, len(raw_file_paths)): args.append([raw_file_paths[i], i]) - if num_workers == False: - num_cores = multiprocessing.cpu_count() - max_workers = num_cores-1 - else: - max_workers = num_workers + num_cores = multiprocessing.cpu_count() + max_workers = num_cores-1 - with multiprocessing.Pool(processes=max_workers) as pool: pool.map(process_file, args)