From 76c1dd571d58da769974f5221223c9003f109f47 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Thu, 29 May 2025 14:53:58 +0000 Subject: [PATCH 01/11] feat: stitching support in v3 --- pychunkedgraph/graph/chunkedgraph.py | 3 +- pychunkedgraph/graph/operation.py | 73 ++++++++++++-------- pychunkedgraph/ingest/create/atomic_layer.py | 16 +++-- pychunkedgraph/ingest/create/parent_layer.py | 4 +- pychunkedgraph/ingest/ran_agglomeration.py | 6 +- requirements.in | 3 +- 6 files changed, 64 insertions(+), 41 deletions(-) diff --git a/pychunkedgraph/graph/chunkedgraph.py b/pychunkedgraph/graph/chunkedgraph.py index 38a408e92..1d0d5a8d1 100644 --- a/pychunkedgraph/graph/chunkedgraph.py +++ b/pychunkedgraph/graph/chunkedgraph.py @@ -1,5 +1,4 @@ # pylint: disable=invalid-name, missing-docstring, too-many-lines, import-outside-toplevel, unsupported-binary-operation - import time import typing import datetime @@ -810,6 +809,7 @@ def add_edges( sink_coords: typing.Sequence[int] = None, allow_same_segment_merge: typing.Optional[bool] = False, do_sanity_check: typing.Optional[bool] = True, + stitch_mode: typing.Optional[bool] = False, ) -> operation.GraphEditOperation.Result: """ Adds an edge to the chunkedgraph @@ -827,6 +827,7 @@ def add_edges( sink_coords=sink_coords, allow_same_segment_merge=allow_same_segment_merge, do_sanity_check=do_sanity_check, + stitch_mode=stitch_mode, ).execute() def remove_edges( diff --git a/pychunkedgraph/graph/operation.py b/pychunkedgraph/graph/operation.py index 8ff29b476..3747ea862 100644 --- a/pychunkedgraph/graph/operation.py +++ b/pychunkedgraph/graph/operation.py @@ -563,6 +563,7 @@ class MergeOperation(GraphEditOperation): "bbox_offset", "allow_same_segment_merge", "do_sanity_check", + "stitch_mode", ] def __init__( @@ -577,6 +578,7 @@ def __init__( affinities: Optional[Sequence[np.float32]] = None, allow_same_segment_merge: Optional[bool] = False, do_sanity_check: Optional[bool] = True, + stitch_mode: bool = False, ) -> None: super().__init__( cg, user_id=user_id, source_coords=source_coords, sink_coords=sink_coords @@ -585,6 +587,7 @@ def __init__( self.bbox_offset = np.atleast_1d(bbox_offset).astype(basetypes.COORDINATES) self.allow_same_segment_merge = allow_same_segment_merge self.do_sanity_check = do_sanity_check + self.stitch_mode = stitch_mode self.affinities = None if affinities is not None: @@ -615,34 +618,43 @@ def _apply( ) ) if len(root_ids) < 2 and not self.allow_same_segment_merge: - raise PreconditionError("Supervoxels must belong to different objects.") - bbox = get_bbox(self.source_coords, self.sink_coords, self.bbox_offset) - with TimeIt("subgraph", self.cg.graph_id, operation_id): - edges = self.cg.get_subgraph( - root_ids, - bbox=bbox, - bbox_is_coordinate=True, - edges_only=True, + raise PreconditionError( + "Supervoxels must belong to different objects." + f" Tried to merge {self.added_edges.ravel()}," + f" which all belong to {tuple(root_ids)[0]}." ) - if self.allow_same_segment_merge: - inactive_edges = types.empty_2d - else: - with TimeIt("preprocess", self.cg.graph_id, operation_id): - inactive_edges = edits.merge_preprocess( - self.cg, - subgraph_edges=edges, - supervoxels=self.added_edges.ravel(), - parent_ts=self.parent_ts, + atomic_edges = self.added_edges + fake_edge_rows = [] + if not self.stitch_mode: + bbox = get_bbox(self.source_coords, self.sink_coords, self.bbox_offset) + with TimeIt("subgraph", self.cg.graph_id, operation_id): + edges = self.cg.get_subgraph( + root_ids, + bbox=bbox, + bbox_is_coordinate=True, + edges_only=True, ) - atomic_edges, fake_edge_rows = edits.check_fake_edges( - self.cg, - atomic_edges=self.added_edges, - inactive_edges=inactive_edges, - time_stamp=timestamp, - parent_ts=self.parent_ts, - ) + if self.allow_same_segment_merge: + inactive_edges = types.empty_2d + else: + with TimeIt("preprocess", self.cg.graph_id, operation_id): + inactive_edges = edits.merge_preprocess( + self.cg, + subgraph_edges=edges, + supervoxels=self.added_edges.ravel(), + parent_ts=self.parent_ts, + ) + + atomic_edges, fake_edge_rows = edits.check_fake_edges( + self.cg, + atomic_edges=self.added_edges, + inactive_edges=inactive_edges, + time_stamp=timestamp, + parent_ts=self.parent_ts, + ) + with TimeIt("add_edges", self.cg.graph_id, operation_id): new_roots, new_l2_ids, new_entries = edits.add_edges( self.cg, @@ -652,6 +664,7 @@ def _apply( parent_ts=self.parent_ts, allow_same_segment_merge=self.allow_same_segment_merge, do_sanity_check=self.do_sanity_check, + stitch_mode=self.stitch_mode, ) return new_roots, new_l2_ids, fake_edge_rows + new_entries @@ -874,12 +887,14 @@ def __init__( "try placing the points further apart." ) - ids = np.concatenate([self.source_ids, self.sink_ids]) + ids = np.concatenate([self.source_ids, self.sink_ids]).astype(basetypes.NODE_ID) layers = self.cg.get_chunk_layers(ids) assert np.sum(layers) == layers.size, "IDs must be supervoxels." def _update_root_ids(self) -> np.ndarray: - sink_and_source_ids = np.concatenate((self.source_ids, self.sink_ids)) + sink_and_source_ids = np.concatenate((self.source_ids, self.sink_ids)).astype( + basetypes.NODE_ID + ) root_ids = np.unique( self.cg.get_roots( sink_and_source_ids, assert_roots=True, time_stamp=self.parent_ts @@ -895,7 +910,9 @@ def _apply( # Verify that sink and source are from the same root object root_ids = set( self.cg.get_roots( - np.concatenate([self.source_ids, self.sink_ids]), + np.concatenate([self.source_ids, self.sink_ids]).astype( + basetypes.NODE_ID + ), assert_roots=True, time_stamp=self.parent_ts, ) @@ -916,7 +933,7 @@ def _apply( edges = reduce(lambda x, y: x + y, edges_tuple, Edges([], [])) supervoxels = np.concatenate( [agg.supervoxels for agg in l2id_agglomeration_d.values()] - ) + ).astype(basetypes.NODE_ID) mask0 = np.isin(edges.node_ids1, supervoxels) mask1 = np.isin(edges.node_ids2, supervoxels) edges = edges[mask0 & mask1] diff --git a/pychunkedgraph/ingest/create/atomic_layer.py b/pychunkedgraph/ingest/create/atomic_layer.py index 0a7aae728..e235d36d4 100644 --- a/pychunkedgraph/ingest/create/atomic_layer.py +++ b/pychunkedgraph/ingest/create/atomic_layer.py @@ -68,8 +68,10 @@ def _get_chunk_nodes_and_edges(chunk_edges_d: dict, isolated_ids: Sequence[int]) in-chunk edges and nodes_ids """ isolated_nodes_self_edges = np.vstack([isolated_ids, isolated_ids]).T - node_ids = [isolated_ids] - edge_ids = [isolated_nodes_self_edges] + node_ids = [isolated_ids] if len(isolated_ids) != 0 else [] + edge_ids = ( + [isolated_nodes_self_edges] if len(isolated_nodes_self_edges) != 0 else [] + ) for edge_type in EDGE_TYPES: edges = chunk_edges_d[edge_type] node_ids.append(edges.node_ids1) @@ -77,9 +79,9 @@ def _get_chunk_nodes_and_edges(chunk_edges_d: dict, isolated_ids: Sequence[int]) node_ids.append(edges.node_ids2) edge_ids.append(edges.get_pairs()) - chunk_node_ids = np.unique(np.concatenate(node_ids)) + chunk_node_ids = np.unique(np.concatenate(node_ids).astype(basetypes.NODE_ID)) edge_ids.append(np.vstack([chunk_node_ids, chunk_node_ids]).T) - return (chunk_node_ids, np.concatenate(edge_ids)) + return (chunk_node_ids, np.concatenate(edge_ids).astype(basetypes.NODE_ID)) def _get_remapping(chunk_edges_d: dict): @@ -116,7 +118,7 @@ def _process_component( r_key = serializers.serialize_uint64(node_id) nodes.append(cg.client.mutate_row(r_key, val_dict, time_stamp=time_stamp)) - chunk_out_edges = np.concatenate(chunk_out_edges) + chunk_out_edges = np.concatenate(chunk_out_edges).astype(basetypes.NODE_ID) cce_layers = cg.get_cross_chunk_edges_layer(chunk_out_edges) u_cce_layers = np.unique(cce_layers) @@ -147,5 +149,7 @@ def _get_outgoing_edges(node_id, chunk_edges_d, sparse_indices, remapping): ] row_ids = row_ids[column_ids == 0] # edges that this node is part of - chunk_out_edges = np.concatenate([chunk_out_edges, edges[row_ids]]) + chunk_out_edges = np.concatenate([chunk_out_edges, edges[row_ids]]).astype( + basetypes.NODE_ID + ) return chunk_out_edges diff --git a/pychunkedgraph/ingest/create/parent_layer.py b/pychunkedgraph/ingest/create/parent_layer.py index 90b24d26a..dfdb48dac 100644 --- a/pychunkedgraph/ingest/create/parent_layer.py +++ b/pychunkedgraph/ingest/create/parent_layer.py @@ -73,7 +73,7 @@ def _read_children_chunks( children_ids = [types.empty_1d] for child_coord in children_coords: children_ids.append(_read_chunk([], cg, layer_id - 1, child_coord)) - return np.concatenate(children_ids) + return np.concatenate(children_ids).astype(basetypes.NODE_ID) with mp.Manager() as manager: children_ids_shared = manager.list() @@ -92,7 +92,7 @@ def _read_children_chunks( multi_args, n_threads=min(len(multi_args), mp.cpu_count()), ) - return np.concatenate(children_ids_shared) + return np.concatenate(children_ids_shared).astype(basetypes.NODE_ID) def _read_chunk_helper(args): diff --git a/pychunkedgraph/ingest/ran_agglomeration.py b/pychunkedgraph/ingest/ran_agglomeration.py index a0ca42d54..d726ba4a5 100644 --- a/pychunkedgraph/ingest/ran_agglomeration.py +++ b/pychunkedgraph/ingest/ran_agglomeration.py @@ -314,7 +314,9 @@ def get_active_edges(edges_d, mapping): if edge_type == EDGE_TYPES.in_chunk: pseudo_isolated_ids.append(edges.node_ids2) - return chunk_edges_active, np.unique(np.concatenate(pseudo_isolated_ids)) + return chunk_edges_active, np.unique( + np.concatenate(pseudo_isolated_ids).astype(basetypes.NODE_ID) + ) def define_active_edges(edge_dict, mapping) -> Union[Dict, np.ndarray]: @@ -380,7 +382,7 @@ def read_raw_agglomeration_data(imanager: IngestionManager, chunk_coord: np.ndar edges_list = _read_agg_files(filenames, chunk_ids, path) G = nx.Graph() - G.add_edges_from(np.concatenate(edges_list)) + G.add_edges_from(np.concatenate(edges_list).astype(basetypes.NODE_ID)) mapping = {} components = list(nx.connected_components(G)) for i_cc, cc in enumerate(components): diff --git a/requirements.in b/requirements.in index 4bd56780b..ecb02e17b 100644 --- a/requirements.in +++ b/requirements.in @@ -15,7 +15,6 @@ rq>2 pyyaml cachetools werkzeug -tensorstore # PyPI only: cloud-files>=6.0.0 @@ -26,7 +25,7 @@ zmesh>=1.7.0 fastremap>=1.14.0 task-queue>=2.14.0 messagingclient -dracopy>=1.3.0 +dracopy>=1.5.0 datastoreflex>=0.5.0 zstandard>=0.23.0 From bdb49d7f766bb62da9d83c061e267313df110f96 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Thu, 29 May 2025 15:05:01 +0000 Subject: [PATCH 02/11] fix: add tensorstore to req --- requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.in b/requirements.in index ecb02e17b..0ae856c87 100644 --- a/requirements.in +++ b/requirements.in @@ -15,6 +15,7 @@ rq>2 pyyaml cachetools werkzeug +tensorstore # PyPI only: cloud-files>=6.0.0 From f9356fc56bf5313cd1a82471e2690defc27c7ee1 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Tue, 17 Jun 2025 16:58:12 -0700 Subject: [PATCH 03/11] debug: sanity check in add_atomic_chunk --- pychunkedgraph/ingest/create/atomic_layer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pychunkedgraph/ingest/create/atomic_layer.py b/pychunkedgraph/ingest/create/atomic_layer.py index e235d36d4..eb8d9b43b 100644 --- a/pychunkedgraph/ingest/create/atomic_layer.py +++ b/pychunkedgraph/ingest/create/atomic_layer.py @@ -36,6 +36,8 @@ def add_atomic_chunk( chunk_ids = cg.get_chunk_ids_from_node_ids(chunk_node_ids) assert len(np.unique(chunk_ids)) == 1 + for chunk_id in chunk_ids: + assert not cg.range_read_chunk(cg.get_parent_chunk_id(chunk_id)) graph, _, _, unique_ids = build_gt_graph(chunk_edge_ids, make_directed=True) ccs = connected_components(graph) From 599d6060cf1158bd0a669c4e170f17a607bfd131 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Mon, 19 Jan 2026 04:10:51 +0000 Subject: [PATCH 04/11] fix(ingest): remove empty parent chunk assert --- pychunkedgraph/ingest/create/atomic_layer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pychunkedgraph/ingest/create/atomic_layer.py b/pychunkedgraph/ingest/create/atomic_layer.py index eb8d9b43b..e235d36d4 100644 --- a/pychunkedgraph/ingest/create/atomic_layer.py +++ b/pychunkedgraph/ingest/create/atomic_layer.py @@ -36,8 +36,6 @@ def add_atomic_chunk( chunk_ids = cg.get_chunk_ids_from_node_ids(chunk_node_ids) assert len(np.unique(chunk_ids)) == 1 - for chunk_id in chunk_ids: - assert not cg.range_read_chunk(cg.get_parent_chunk_id(chunk_id)) graph, _, _, unique_ids = build_gt_graph(chunk_edge_ids, make_directed=True) ccs = connected_components(graph) From f0416263beaa82b9fca475be36cfded0d35e209a Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Sat, 24 Jan 2026 03:06:09 +0000 Subject: [PATCH 05/11] feat(stitching): don't use locks if stitch_mode True --- pychunkedgraph/graph/operation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pychunkedgraph/graph/operation.py b/pychunkedgraph/graph/operation.py index 3747ea862..a59ddba94 100644 --- a/pychunkedgraph/graph/operation.py +++ b/pychunkedgraph/graph/operation.py @@ -420,6 +420,7 @@ def execute( op_type = "merge" if is_merge else "split" self.parent_ts = parent_ts root_ids = self._update_root_ids() + self.privileged_mode = self.privileged_mode or (is_merge and self.stitch_mode) with locks.RootLock( self.cg, root_ids, From d9c66d26dbb5d728efc0d48534b4b0bee218dab6 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Fri, 23 Jan 2026 23:14:46 -0800 Subject: [PATCH 06/11] hotfix: exponential range search for root ID assignment --- pychunkedgraph/graph/edits.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pychunkedgraph/graph/edits.py b/pychunkedgraph/graph/edits.py index 25f31dd02..9bc0181da 100644 --- a/pychunkedgraph/graph/edits.py +++ b/pychunkedgraph/graph/edits.py @@ -223,6 +223,7 @@ def add_edges( edges, l2_cross_edges_d = _analyze_affected_edges( cg, atomic_edges, parent_ts=parent_ts ) + l2ids = np.unique(edges) if not allow_same_segment_merge and not stitch_mode: roots = cg.get_roots(l2ids, assert_roots=True, time_stamp=parent_ts) @@ -230,6 +231,7 @@ def add_edges( new_old_id_d = defaultdict(set) old_new_id_d = defaultdict(set) + old_hierarchy_d = _init_old_hierarchy(cg, l2ids, parent_ts=parent_ts) atomic_children_d = cg.get_children(l2ids) cross_edges_d = merge_cross_edge_dicts( @@ -550,6 +552,7 @@ def _update_neighbor_cx_edges( """ updated_counterparts = {} newid_cx_edges_d = cg.get_cross_chunk_edges(new_ids, time_stamp=parent_ts) + node_map = {} for k, v in old_new_id.items(): if len(v) == 1: @@ -572,11 +575,13 @@ def _update_neighbor_cx_edges( cg, new_id, node_map, cp_layers, all_cx_edges_d, descendants_d ) updated_counterparts.update(result) + updated_entries = [] for node, val_dict in updated_counterparts.items(): rowkey = serialize_uint64(node) row = cg.client.mutate_row(rowkey, val_dict, time_stamp=time_stamp) updated_entries.append(row) + return updated_entries @@ -649,6 +654,7 @@ def _get_layer_node_ids( # get their parents, then children of those parents old_parents = self.cg.get_parents(old_ids, time_stamp=self._last_ts) siblings = self.cg.get_children(np.unique(old_parents), flatten=True) + # replace old identities with new IDs mask = np.isin(siblings, old_ids) node_ids = [flip_ids(self._old_new_id_d, old_ids), siblings[~mask], new_ids] @@ -933,4 +939,5 @@ def create_new_entries(self) -> List: time_stamp=self._time_stamp, ) ) + self._update_root_id_lineage() From ac596b2b4263e3773526a9560a88f925689e8108 Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Fri, 23 Jan 2026 23:55:59 -0800 Subject: [PATCH 07/11] hotfix: explicit uint64 typing to avoid float64 rounding --- pychunkedgraph/graph/edges/utils.py | 4 +++- pychunkedgraph/graph/segmenthistory.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pychunkedgraph/graph/edges/utils.py b/pychunkedgraph/graph/edges/utils.py index f79debf94..08a7d1ae6 100644 --- a/pychunkedgraph/graph/edges/utils.py +++ b/pychunkedgraph/graph/edges/utils.py @@ -70,7 +70,9 @@ def merge_cross_edge_dicts(x_edges_d1: Dict, x_edges_d2: Dict) -> Dict: Combines two cross chunk dictionaries of form {node_id: {layer id : edge list}}. """ - node_ids = np.unique(list(x_edges_d1.keys()) + list(x_edges_d2.keys())) + node_ids = np.unique( + np.array(list(x_edges_d1.keys()) + list(x_edges_d2.keys()), dtype=basetypes.NODE_ID) + ) result_d = {} for node_id in node_ids: cross_edge_ds = [x_edges_d1.get(node_id, {}), x_edges_d2.get(node_id, {})] diff --git a/pychunkedgraph/graph/segmenthistory.py b/pychunkedgraph/graph/segmenthistory.py index bc4422490..de85ded55 100644 --- a/pychunkedgraph/graph/segmenthistory.py +++ b/pychunkedgraph/graph/segmenthistory.py @@ -78,7 +78,7 @@ def operation_id_root_id_dict(self): @property def operation_ids(self): - return np.array(list(self.operation_id_root_id_dict.keys())) + return np.array(list(self.operation_id_root_id_dict.keys()), dtype=basetypes.OPERATION_ID) @property def _log_rows(self): From 62540c0039012cba5317a7e70ed1e788b69d88fd Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Sat, 24 Jan 2026 01:26:12 -0800 Subject: [PATCH 08/11] temp: profiling --- pychunkedgraph/graph/edits.py | 76 +++++++++++++++++++------------ pychunkedgraph/graph/operation.py | 59 +++++++++++++----------- 2 files changed, 79 insertions(+), 56 deletions(-) diff --git a/pychunkedgraph/graph/edits.py b/pychunkedgraph/graph/edits.py index 9bc0181da..4abf7ef2e 100644 --- a/pychunkedgraph/graph/edits.py +++ b/pychunkedgraph/graph/edits.py @@ -7,6 +7,7 @@ from typing import Iterable from typing import Set from collections import defaultdict +from contextlib import contextmanager import fastremap import numpy as np @@ -66,8 +67,11 @@ def _analyze_affected_edges( Also returns new cross edges dicts for nodes crossing chunk boundary. """ + profiler = get_profiler() + supervoxels = np.unique(atomic_edges) - parents = cg.get_parents(supervoxels, time_stamp=parent_ts) + with profiler.profile("analyze_get_parents"): + parents = cg.get_parents(supervoxels, time_stamp=parent_ts) sv_parent_d = dict(zip(supervoxels.tolist(), parents)) edge_layers = cg.get_cross_chunk_edges_layer(atomic_edges) parent_edges = [ @@ -220,17 +224,23 @@ def add_edges( stitch_mode: bool = False, do_sanity_check: bool = True, ): - edges, l2_cross_edges_d = _analyze_affected_edges( - cg, atomic_edges, parent_ts=parent_ts - ) + profiler = get_profiler() + profiler.reset() # Reset for fresh profiling - l2ids = np.unique(edges) - if not allow_same_segment_merge and not stitch_mode: - roots = cg.get_roots(l2ids, assert_roots=True, time_stamp=parent_ts) - assert np.unique(roots).size >= 2, "L2 IDs must belong to different roots." + with profiler.profile("add_edges"): + with profiler.profile("analyze_affected_edges"): + edges, l2_cross_edges_d = _analyze_affected_edges( + cg, atomic_edges, parent_ts=parent_ts + ) - new_old_id_d = defaultdict(set) - old_new_id_d = defaultdict(set) + l2ids = np.unique(edges) + if not allow_same_segment_merge and not stitch_mode: + with profiler.profile("validate_roots"): + roots = cg.get_roots(l2ids, assert_roots=True, time_stamp=parent_ts) + assert np.unique(roots).size >= 2, "L2 IDs must belong to different roots." + + new_old_id_d = defaultdict(set) + old_new_id_d = defaultdict(set) old_hierarchy_d = _init_old_hierarchy(cg, l2ids, parent_ts=parent_ts) atomic_children_d = cg.get_children(l2ids) @@ -269,19 +279,20 @@ def add_edges( cg.cache.children_cache[new_id] = merged_children cache_utils.update(cg.cache.parents_cache, merged_children, new_id) - # update cross chunk edges by replacing old_ids with new - # this can be done only after all new IDs have been created - for new_id, cc_indices in zip(new_l2_ids, components): - l2ids_ = graph_ids[cc_indices] - new_cx_edges_d = {} - cx_edges = [cross_edges_d[l2id] for l2id in l2ids_] - cx_edges_d = concatenate_cross_edge_dicts(cx_edges, unique=True) - temp_map = {k: next(iter(v)) for k, v in old_new_id_d.items()} - for layer, edges in cx_edges_d.items(): - edges = fastremap.remap(edges, temp_map, preserve_missing_labels=True) - new_cx_edges_d[layer] = edges - assert np.all(edges[:, 0] == new_id) - cg.cache.cross_chunk_edges_cache[new_id] = new_cx_edges_d + # update cross chunk edges by replacing old_ids with new + # this can be done only after all new IDs have been created + with profiler.profile("update_cross_edges"): + for new_id, cc_indices in zip(new_l2_ids, components): + l2ids_ = graph_ids[cc_indices] + new_cx_edges_d = {} + cx_edges = [cross_edges_d[l2id] for l2id in l2ids_] + cx_edges_d = concatenate_cross_edge_dicts(cx_edges, unique=True) + temp_map = {k: next(iter(v)) for k, v in old_new_id_d.items()} + for layer, edges in cx_edges_d.items(): + edges = fastremap.remap(edges, temp_map, preserve_missing_labels=True) + new_cx_edges_d[layer] = edges + assert np.all(edges[:, 0] == new_id) + cg.cache.cross_chunk_edges_cache[new_id] = new_cx_edges_d profiler = get_profiler() profiler.reset() @@ -550,8 +561,11 @@ def _update_neighbor_cx_edges( and then write to storage to consolidate the mutations. Returns mutations to updated counterparts/partner nodes. """ + profiler = get_profiler() updated_counterparts = {} - newid_cx_edges_d = cg.get_cross_chunk_edges(new_ids, time_stamp=parent_ts) + + with profiler.profile("neighbor_get_cross_chunk_edges"): + newid_cx_edges_d = cg.get_cross_chunk_edges(new_ids, time_stamp=parent_ts) node_map = {} for k, v in old_new_id.items(): @@ -576,11 +590,12 @@ def _update_neighbor_cx_edges( ) updated_counterparts.update(result) - updated_entries = [] - for node, val_dict in updated_counterparts.items(): - rowkey = serialize_uint64(node) - row = cg.client.mutate_row(rowkey, val_dict, time_stamp=time_stamp) - updated_entries.append(row) + with profiler.profile("neighbor_create_mutations"): + updated_entries = [] + for node, val_dict in updated_counterparts.items(): + rowkey = serialize_uint64(node) + row = cg.client.mutate_row(rowkey, val_dict, time_stamp=time_stamp) + updated_entries.append(row) return updated_entries @@ -940,4 +955,5 @@ def create_new_entries(self) -> List: ) ) - self._update_root_id_lineage() + with self._profiler.profile("update_root_id_lineage"): + self._update_root_id_lineage() diff --git a/pychunkedgraph/graph/operation.py b/pychunkedgraph/graph/operation.py index a59ddba94..4bafe2bf5 100644 --- a/pychunkedgraph/graph/operation.py +++ b/pychunkedgraph/graph/operation.py @@ -25,6 +25,7 @@ from . import attributes from .edges import Edges from .edges.utils import get_edges_status +from .edits import get_profiler from .utils import basetypes from .utils import serializers from .cache import CacheService @@ -613,11 +614,14 @@ def _update_root_ids(self) -> np.ndarray: def _apply( self, *, operation_id, timestamp ) -> Tuple[np.ndarray, np.ndarray, List["bigtable.row.Row"]]: - root_ids = set( - self.cg.get_roots( - self.added_edges.ravel(), assert_roots=True, time_stamp=self.parent_ts + profiler = get_profiler() + + with profiler.profile("merge_apply_get_roots"): + root_ids = set( + self.cg.get_roots( + self.added_edges.ravel(), assert_roots=True, time_stamp=self.parent_ts + ) ) - ) if len(root_ids) < 2 and not self.allow_same_segment_merge: raise PreconditionError( "Supervoxels must belong to different objects." @@ -629,32 +633,35 @@ def _apply( fake_edge_rows = [] if not self.stitch_mode: bbox = get_bbox(self.source_coords, self.sink_coords, self.bbox_offset) - with TimeIt("subgraph", self.cg.graph_id, operation_id): - edges = self.cg.get_subgraph( - root_ids, - bbox=bbox, - bbox_is_coordinate=True, - edges_only=True, - ) + with profiler.profile("get_subgraph"): + with TimeIt("subgraph", self.cg.graph_id, operation_id): + edges = self.cg.get_subgraph( + root_ids, + bbox=bbox, + bbox_is_coordinate=True, + edges_only=True, + ) if self.allow_same_segment_merge: inactive_edges = types.empty_2d else: - with TimeIt("preprocess", self.cg.graph_id, operation_id): - inactive_edges = edits.merge_preprocess( - self.cg, - subgraph_edges=edges, - supervoxels=self.added_edges.ravel(), - parent_ts=self.parent_ts, - ) - - atomic_edges, fake_edge_rows = edits.check_fake_edges( - self.cg, - atomic_edges=self.added_edges, - inactive_edges=inactive_edges, - time_stamp=timestamp, - parent_ts=self.parent_ts, - ) + with profiler.profile("merge_preprocess"): + with TimeIt("preprocess", self.cg.graph_id, operation_id): + inactive_edges = edits.merge_preprocess( + self.cg, + subgraph_edges=edges, + supervoxels=self.added_edges.ravel(), + parent_ts=self.parent_ts, + ) + + with profiler.profile("check_fake_edges"): + atomic_edges, fake_edge_rows = edits.check_fake_edges( + self.cg, + atomic_edges=self.added_edges, + inactive_edges=inactive_edges, + time_stamp=timestamp, + parent_ts=self.parent_ts, + ) with TimeIt("add_edges", self.cg.graph_id, operation_id): new_roots, new_l2_ids, new_entries = edits.add_edges( From c2cb99d3df80fca726bc265aae83e2c4878354bd Mon Sep 17 00:00:00 2001 From: Dodam Ih Date: Sun, 25 Jan 2026 15:56:13 -0800 Subject: [PATCH 09/11] fix: filter children array during meshing --- pychunkedgraph/meshing/meshgen_utils.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pychunkedgraph/meshing/meshgen_utils.py b/pychunkedgraph/meshing/meshgen_utils.py index 711c09322..43e6f5c3a 100644 --- a/pychunkedgraph/meshing/meshgen_utils.py +++ b/pychunkedgraph/meshing/meshgen_utils.py @@ -129,7 +129,13 @@ def recursive_helper(cur_node_ids): only_child_mask = np.array( [len(children_for_node) == 1 for children_for_node in children_array] ) - only_children = children_array[only_child_mask].astype(np.uint64).ravel() + # Extract children from object array - each filtered element is a 1-element array + filtered_children = children_array[only_child_mask] + only_children = ( + np.concatenate(filtered_children).astype(np.uint64) + if filtered_children.size + else np.array([], dtype=np.uint64) + ) if np.any(only_child_mask): temp_array = cur_node_ids[stop_layer_mask] temp_array[only_child_mask] = recursive_helper(only_children) From f307df2850a17d62d0562214589dfa904a992725 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Wed, 28 Jan 2026 01:55:28 +0000 Subject: [PATCH 10/11] fix(tests): match shape for np isin --- pychunkedgraph/graph/cutting.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pychunkedgraph/graph/cutting.py b/pychunkedgraph/graph/cutting.py index a2fca8023..cc1240710 100644 --- a/pychunkedgraph/graph/cutting.py +++ b/pychunkedgraph/graph/cutting.py @@ -395,8 +395,8 @@ def _remap_cut_edge_set(self, cut_edge_set): remapped_cutset = np.array(remapped_cutset, dtype=np.uint64) - remapped_cutset_flattened_view = remapped_cutset.view(dtype="u8,u8") - edges_flattened_view = self.cg_edges.view(dtype="u8,u8") + remapped_cutset_flattened_view = remapped_cutset.view(dtype="u8,u8").ravel() + edges_flattened_view = self.cg_edges.view(dtype="u8,u8").ravel() cutset_mask = np.isin(remapped_cutset_flattened_view, edges_flattened_view).ravel() From 0f71a5ea33b9f047454fced834b3900240afc555 Mon Sep 17 00:00:00 2001 From: Akhilesh Halageri Date: Fri, 13 Feb 2026 20:14:57 +0000 Subject: [PATCH 11/11] keeps edits and edges identical to pcgv3 --- pychunkedgraph/graph/edits.py | 83 +++++++++++++---------------------- 1 file changed, 30 insertions(+), 53 deletions(-) diff --git a/pychunkedgraph/graph/edits.py b/pychunkedgraph/graph/edits.py index 4abf7ef2e..25f31dd02 100644 --- a/pychunkedgraph/graph/edits.py +++ b/pychunkedgraph/graph/edits.py @@ -7,7 +7,6 @@ from typing import Iterable from typing import Set from collections import defaultdict -from contextlib import contextmanager import fastremap import numpy as np @@ -67,11 +66,8 @@ def _analyze_affected_edges( Also returns new cross edges dicts for nodes crossing chunk boundary. """ - profiler = get_profiler() - supervoxels = np.unique(atomic_edges) - with profiler.profile("analyze_get_parents"): - parents = cg.get_parents(supervoxels, time_stamp=parent_ts) + parents = cg.get_parents(supervoxels, time_stamp=parent_ts) sv_parent_d = dict(zip(supervoxels.tolist(), parents)) edge_layers = cg.get_cross_chunk_edges_layer(atomic_edges) parent_edges = [ @@ -224,24 +220,16 @@ def add_edges( stitch_mode: bool = False, do_sanity_check: bool = True, ): - profiler = get_profiler() - profiler.reset() # Reset for fresh profiling - - with profiler.profile("add_edges"): - with profiler.profile("analyze_affected_edges"): - edges, l2_cross_edges_d = _analyze_affected_edges( - cg, atomic_edges, parent_ts=parent_ts - ) - - l2ids = np.unique(edges) - if not allow_same_segment_merge and not stitch_mode: - with profiler.profile("validate_roots"): - roots = cg.get_roots(l2ids, assert_roots=True, time_stamp=parent_ts) - assert np.unique(roots).size >= 2, "L2 IDs must belong to different roots." - - new_old_id_d = defaultdict(set) - old_new_id_d = defaultdict(set) + edges, l2_cross_edges_d = _analyze_affected_edges( + cg, atomic_edges, parent_ts=parent_ts + ) + l2ids = np.unique(edges) + if not allow_same_segment_merge and not stitch_mode: + roots = cg.get_roots(l2ids, assert_roots=True, time_stamp=parent_ts) + assert np.unique(roots).size >= 2, "L2 IDs must belong to different roots." + new_old_id_d = defaultdict(set) + old_new_id_d = defaultdict(set) old_hierarchy_d = _init_old_hierarchy(cg, l2ids, parent_ts=parent_ts) atomic_children_d = cg.get_children(l2ids) cross_edges_d = merge_cross_edge_dicts( @@ -279,20 +267,19 @@ def add_edges( cg.cache.children_cache[new_id] = merged_children cache_utils.update(cg.cache.parents_cache, merged_children, new_id) - # update cross chunk edges by replacing old_ids with new - # this can be done only after all new IDs have been created - with profiler.profile("update_cross_edges"): - for new_id, cc_indices in zip(new_l2_ids, components): - l2ids_ = graph_ids[cc_indices] - new_cx_edges_d = {} - cx_edges = [cross_edges_d[l2id] for l2id in l2ids_] - cx_edges_d = concatenate_cross_edge_dicts(cx_edges, unique=True) - temp_map = {k: next(iter(v)) for k, v in old_new_id_d.items()} - for layer, edges in cx_edges_d.items(): - edges = fastremap.remap(edges, temp_map, preserve_missing_labels=True) - new_cx_edges_d[layer] = edges - assert np.all(edges[:, 0] == new_id) - cg.cache.cross_chunk_edges_cache[new_id] = new_cx_edges_d + # update cross chunk edges by replacing old_ids with new + # this can be done only after all new IDs have been created + for new_id, cc_indices in zip(new_l2_ids, components): + l2ids_ = graph_ids[cc_indices] + new_cx_edges_d = {} + cx_edges = [cross_edges_d[l2id] for l2id in l2ids_] + cx_edges_d = concatenate_cross_edge_dicts(cx_edges, unique=True) + temp_map = {k: next(iter(v)) for k, v in old_new_id_d.items()} + for layer, edges in cx_edges_d.items(): + edges = fastremap.remap(edges, temp_map, preserve_missing_labels=True) + new_cx_edges_d[layer] = edges + assert np.all(edges[:, 0] == new_id) + cg.cache.cross_chunk_edges_cache[new_id] = new_cx_edges_d profiler = get_profiler() profiler.reset() @@ -561,12 +548,8 @@ def _update_neighbor_cx_edges( and then write to storage to consolidate the mutations. Returns mutations to updated counterparts/partner nodes. """ - profiler = get_profiler() updated_counterparts = {} - - with profiler.profile("neighbor_get_cross_chunk_edges"): - newid_cx_edges_d = cg.get_cross_chunk_edges(new_ids, time_stamp=parent_ts) - + newid_cx_edges_d = cg.get_cross_chunk_edges(new_ids, time_stamp=parent_ts) node_map = {} for k, v in old_new_id.items(): if len(v) == 1: @@ -589,14 +572,11 @@ def _update_neighbor_cx_edges( cg, new_id, node_map, cp_layers, all_cx_edges_d, descendants_d ) updated_counterparts.update(result) - - with profiler.profile("neighbor_create_mutations"): - updated_entries = [] - for node, val_dict in updated_counterparts.items(): - rowkey = serialize_uint64(node) - row = cg.client.mutate_row(rowkey, val_dict, time_stamp=time_stamp) - updated_entries.append(row) - + updated_entries = [] + for node, val_dict in updated_counterparts.items(): + rowkey = serialize_uint64(node) + row = cg.client.mutate_row(rowkey, val_dict, time_stamp=time_stamp) + updated_entries.append(row) return updated_entries @@ -669,7 +649,6 @@ def _get_layer_node_ids( # get their parents, then children of those parents old_parents = self.cg.get_parents(old_ids, time_stamp=self._last_ts) siblings = self.cg.get_children(np.unique(old_parents), flatten=True) - # replace old identities with new IDs mask = np.isin(siblings, old_ids) node_ids = [flip_ids(self._old_new_id_d, old_ids), siblings[~mask], new_ids] @@ -954,6 +933,4 @@ def create_new_entries(self) -> List: time_stamp=self._time_stamp, ) ) - - with self._profiler.profile("update_root_id_lineage"): - self._update_root_id_lineage() + self._update_root_id_lineage()