From 931c4cb22e62ba3c2d34a8c27cb30f7240bd9f02 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Mon, 12 Jan 2026 23:00:11 +0800 Subject: [PATCH 1/3] perf: reduce system bubble time --- .../aggregated_config.yaml | 3 +- .../models/partitioner/ece_partitioner.py | 8 +- .../operators/build_kg/build_kg_service.py | 20 +++-- graphgen/operators/build_kg/build_mm_kg.py | 4 +- graphgen/operators/build_kg/build_text_kg.py | 4 +- .../operators/evaluate/evaluate_service.py | 12 +-- .../operators/partition/partition_service.py | 7 +- graphgen/operators/quiz/quiz_service.py | 74 ++++++++----------- 8 files changed, 71 insertions(+), 61 deletions(-) diff --git a/examples/generate/generate_aggregated_qa/aggregated_config.yaml b/examples/generate/generate_aggregated_qa/aggregated_config.yaml index 4599db50..c72339ca 100644 --- a/examples/generate/generate_aggregated_qa/aggregated_config.yaml +++ b/examples/generate/generate_aggregated_qa/aggregated_config.yaml @@ -34,7 +34,7 @@ nodes: - id: quiz op_name: quiz - type: aggregate + type: map_batch dependencies: - build_kg execution_params: @@ -42,7 +42,6 @@ nodes: batch_size: 128 params: quiz_samples: 2 # number of quiz samples to generate - concurrency_limit: 200 - id: judge op_name: judge diff --git a/graphgen/models/partitioner/ece_partitioner.py b/graphgen/models/partitioner/ece_partitioner.py index c2611be3..46765fd7 100644 --- a/graphgen/models/partitioner/ece_partitioner.py +++ b/graphgen/models/partitioner/ece_partitioner.py @@ -1,3 +1,4 @@ +import math import random from collections import deque from typing import Any, Dict, Iterable, List, Optional, Set, Tuple @@ -34,17 +35,18 @@ def _sort_units(units: list, edge_sampling: str) -> list: :param edge_sampling: edge sampling strategy (random, min_loss, max_loss) :return: sorted units """ + default_loss = -math.log(0.1) if edge_sampling == "random": random.shuffle(units) elif edge_sampling == "min_loss": units = sorted( units, - key=lambda x: x[-1]["loss"], + key=lambda x: x[-1].get("loss", default_loss), ) elif edge_sampling == "max_loss": units = sorted( units, - key=lambda x: x[-1]["loss"], + key=lambda x: x[-1].get("loss", default_loss), reverse=True, ) else: @@ -142,7 +144,7 @@ def _add_unit(u): return Community( id=seed_unit[1], nodes=list(community_nodes.keys()), - edges=[tuple(sorted(e)) for e in community_edges] + edges=[tuple(sorted(e)) for e in community_edges], ) for unit in tqdm(all_units, desc="ECE partition"): diff --git a/graphgen/operators/build_kg/build_kg_service.py b/graphgen/operators/build_kg/build_kg_service.py index 532cfc79..afeeefd7 100644 --- a/graphgen/operators/build_kg/build_kg_service.py +++ b/graphgen/operators/build_kg/build_kg_service.py @@ -28,10 +28,10 @@ def process(self, batch: pd.DataFrame) -> pd.DataFrame: docs = [Chunk.from_dict(doc["_chunk_id"], doc) for doc in docs] # consume the chunks and build kg - self.build_kg(docs) - return pd.DataFrame([{"status": "kg_building_completed"}]) + nodes, edges = self.build_kg(docs) + return pd.DataFrame([{"nodes": nodes, "edges": edges}]) - def build_kg(self, chunks: List[Chunk]) -> None: + def build_kg(self, chunks: List[Chunk]) -> tuple: """ Build knowledge graph (KG) and merge into kg_instance """ @@ -42,24 +42,34 @@ def build_kg(self, chunks: List[Chunk]) -> None: if chunk.type in ("image", "video", "table", "formula") ] + nodes = {} + edges = {} + if len(text_chunks) == 0: logger.info("All text chunks are already in the storage") else: logger.info("[Text Entity and Relation Extraction] processing ...") - build_text_kg( + text_nodes, text_edges = build_text_kg( llm_client=self.llm_client, kg_instance=self.graph_storage, chunks=text_chunks, max_loop=self.max_loop, ) + nodes.update(text_nodes) + edges.update(text_edges) if len(mm_chunks) == 0: logger.info("All multi-modal chunks are already in the storage") else: logger.info("[Multi-modal Entity and Relation Extraction] processing ...") - build_mm_kg( + mm_nodes, mm_edges = build_mm_kg( llm_client=self.llm_client, kg_instance=self.graph_storage, chunks=mm_chunks, ) + nodes.update(mm_nodes) + edges.update(mm_edges) self.graph_storage.index_done_callback() + logger.info("Knowledge graph building completed.") + + return nodes, edges diff --git a/graphgen/operators/build_kg/build_mm_kg.py b/graphgen/operators/build_kg/build_mm_kg.py index ee0459ea..84bf12a6 100644 --- a/graphgen/operators/build_kg/build_mm_kg.py +++ b/graphgen/operators/build_kg/build_mm_kg.py @@ -12,7 +12,7 @@ def build_mm_kg( llm_client: BaseLLMWrapper, kg_instance: BaseGraphStorage, chunks: List[Chunk], -): +) -> tuple: """ Build multi-modal KG and merge into kg_instance :param llm_client: Synthesizer LLM model to extract entities and relationships @@ -48,3 +48,5 @@ def build_mm_kg( list(edges.items()), desc="Inserting relationships into storage", ) + + return nodes, edges diff --git a/graphgen/operators/build_kg/build_text_kg.py b/graphgen/operators/build_kg/build_text_kg.py index b599e5c2..f44fcae8 100644 --- a/graphgen/operators/build_kg/build_text_kg.py +++ b/graphgen/operators/build_kg/build_text_kg.py @@ -13,7 +13,7 @@ def build_text_kg( kg_instance: BaseGraphStorage, chunks: List[Chunk], max_loop: int = 3, -): +) -> tuple: """ :param llm_client: Synthesizer LLM model to extract entities and relationships :param kg_instance @@ -50,3 +50,5 @@ def build_text_kg( list(edges.items()), desc="Inserting relationships into storage", ) + + return nodes, edges diff --git a/graphgen/operators/evaluate/evaluate_service.py b/graphgen/operators/evaluate/evaluate_service.py index b0875d7f..80586c1d 100644 --- a/graphgen/operators/evaluate/evaluate_service.py +++ b/graphgen/operators/evaluate/evaluate_service.py @@ -95,10 +95,10 @@ async def _process_single_qa(self, item: dict[str, Any]) -> dict[str, Any]: answer=str(item.get("answer", "")), ) if not qa_pair.question or not qa_pair.answer: - self.logger.error("Empty question or answer, skipping.") + logger.error("Empty question or answer, skipping.") return {} except Exception as e: - self.logger.error("Error in QAPair creation: %s", str(e)) + logger.error("Error in QAPair creation: %s", str(e)) return {} for metric, evaluator in self.qa_evaluators.items(): @@ -110,7 +110,7 @@ async def _process_single_qa(self, item: dict[str, Any]) -> dict[str, Any]: else: item[metric] = float(score) except Exception as e: - self.logger.error("Error in %s evaluation: %s", metric, str(e)) + logger.error("Error in %s evaluation: %s", metric, str(e)) item[metric] = None return item @@ -136,7 +136,7 @@ def transform_messages_format(items: list[dict]) -> list[dict]: return [] if not self.qa_evaluators: - self.logger.warning("No QA evaluators initialized, skipping QA evaluation") + logger.warning("No QA evaluators initialized, skipping QA evaluation") return [] items = transform_messages_format(items) @@ -155,11 +155,11 @@ def _evaluate_kg(self) -> Dict[str, Any]: for metric, evaluator in self.kg_evaluators.items(): try: - self.logger.info("Running %s evaluation...", metric) + logger.info("Running %s evaluation...", metric) score = evaluator.evaluate() results[metric] = score except Exception as e: - self.logger.error("Error in %s evaluation: %s", metric, str(e)) + logger.error("Error in %s evaluation: %s", metric, str(e)) results[metric] = {"error": str(e)} return results diff --git a/graphgen/operators/partition/partition_service.py b/graphgen/operators/partition/partition_service.py index 2289fec6..8985e5b8 100644 --- a/graphgen/operators/partition/partition_service.py +++ b/graphgen/operators/partition/partition_service.py @@ -79,9 +79,13 @@ def partition(self) -> Iterable[pd.DataFrame]: else: raise ValueError(f"Unsupported partition method: {method}") - communities = partitioner.partition(g=self.kg_instance, **method_params) + communities: Iterable = partitioner.partition( + g=self.kg_instance, **method_params + ) + count = 0 for community in communities: + count += 1 batch = partitioner.community2batch(community, g=self.kg_instance) batch = self._attach_additional_data_to_node(batch) @@ -91,6 +95,7 @@ def partition(self) -> Iterable[pd.DataFrame]: "edges": [batch[1]], } ) + logger.info("Total communities partitioned: %d", count) def _pre_tokenize(self) -> None: """Pre-tokenize all nodes and edges to add token length information.""" diff --git a/graphgen/operators/quiz/quiz_service.py b/graphgen/operators/quiz/quiz_service.py index a6aeb7be..ca3b4bfc 100644 --- a/graphgen/operators/quiz/quiz_service.py +++ b/graphgen/operators/quiz/quiz_service.py @@ -15,7 +15,6 @@ def __init__( graph_backend: str = "kuzu", kv_backend: str = "rocksdb", quiz_samples: int = 1, - concurrency_limit: int = 200, ): super().__init__(working_dir=working_dir, op_name="quiz_service") self.quiz_samples = quiz_samples @@ -28,21 +27,16 @@ def __init__( backend=kv_backend, working_dir=working_dir, namespace="quiz" ) self.generator = QuizGenerator(self.llm_client) - self.concurrency_limit = concurrency_limit def process(self, batch: pd.DataFrame) -> Iterable[pd.DataFrame]: - # this operator does not consume any batch data - # but for compatibility we keep the interface - _ = batch.to_dict(orient="records") + data = batch.to_dict(orient="records") self.graph_storage.reload() - yield from self.quiz() + return self.quiz(data) async def _process_single_quiz(self, item: tuple) -> dict | None: # if quiz in quiz_storage exists already, directly get it index, desc = item _quiz_id = compute_dict_hash({"index": index, "description": desc}) - if self.quiz_storage.get_by_id(_quiz_id): - return None tasks = [] for i in range(self.quiz_samples): @@ -68,47 +62,43 @@ async def _process_single_quiz(self, item: tuple) -> dict | None: logger.error("Error when quizzing description %s: %s", item, e) return None - def quiz(self) -> Iterable[pd.DataFrame]: + def quiz(self, batch) -> Iterable[pd.DataFrame]: """ Get all nodes and edges and quiz their descriptions using QuizGenerator. """ - edges = self.graph_storage.get_all_edges() - nodes = self.graph_storage.get_all_nodes() - items = [] - for edge in edges: - edge_data = edge[2] - desc = edge_data["description"] - items.append(((edge[0], edge[1]), desc)) + for item in batch: + nodes = item.get("nodes", []) + edges = item.get("edges", []) - for node in nodes: - node_data = node[1] - desc = node_data["description"] - items.append((node[0], desc)) + for node_id, node_data in nodes.items(): + node_data = node_data[0] + desc = node_data["description"] + items.append((node_id, desc)) + for edge_key, edge_data in edges.items(): + edge_data = edge_data[0] + desc = edge_data["description"] + items.append((edge_key, desc)) logger.info("Total descriptions to quiz: %d", len(items)) - for i in range(0, len(items), self.concurrency_limit): - batch_items = items[i : i + self.concurrency_limit] - batch_results = run_concurrent( - self._process_single_quiz, - batch_items, - desc=f"Quizzing descriptions ({i} / {i + len(batch_items)})", - unit="description", - ) + results = run_concurrent( + self._process_single_quiz, + items, + desc=f"Quizzing batch of {len(items)} descriptions", + unit="description", + ) + valid_results = [res for res in results if res] - final_results = [] - for new_result in batch_results: - if new_result: - self.quiz_storage.upsert( - { - new_result["_quiz_id"]: { - "description": new_result["description"], - "quizzes": new_result["quizzes"], - } - } - ) - final_results.append(new_result) - self.quiz_storage.index_done_callback() - yield pd.DataFrame(final_results) + for res in valid_results: + self.quiz_storage.upsert( + { + res["_quiz_id"]: { + "description": res["description"], + "quizzes": res["quizzes"], + } + } + ) + self.quiz_storage.index_done_callback() + return pd.DataFrame(valid_results) From a3a3c411a60d5c4c7983a8f40613a846841c39a5 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Tue, 13 Jan 2026 17:36:52 +0800 Subject: [PATCH 2/3] feat: make build_kg return list of nodes & edges --- .../models/kg_builder/light_rag_kg_builder.py | 14 ++++++++++++-- .../operators/build_kg/build_kg_service.py | 17 ++++++++++------- graphgen/operators/build_kg/build_mm_kg.py | 4 ++-- graphgen/operators/build_kg/build_text_kg.py | 4 ++-- graphgen/operators/quiz/quiz_service.py | 18 ++++++++---------- 5 files changed, 34 insertions(+), 23 deletions(-) diff --git a/graphgen/models/kg_builder/light_rag_kg_builder.py b/graphgen/models/kg_builder/light_rag_kg_builder.py index a6185f44..c3e7345c 100644 --- a/graphgen/models/kg_builder/light_rag_kg_builder.py +++ b/graphgen/models/kg_builder/light_rag_kg_builder.py @@ -99,7 +99,7 @@ async def merge_nodes( self, node_data: tuple[str, List[dict]], kg_instance: BaseGraphStorage, - ) -> None: + ) -> dict: entity_name, node_data = node_data entity_types = [] source_ids = [] @@ -131,16 +131,18 @@ async def merge_nodes( node_data = { "entity_type": entity_type, + "entity_name": entity_name, "description": description, "source_id": source_id, } kg_instance.upsert_node(entity_name, node_data=node_data) + return node_data async def merge_edges( self, edges_data: tuple[Tuple[str, str], List[dict]], kg_instance: BaseGraphStorage, - ) -> None: + ) -> dict: (src_id, tgt_id), edge_data = edges_data source_ids = [] @@ -175,11 +177,19 @@ async def merge_edges( f"({src_id}, {tgt_id})", description ) + edge_data = { + "src_id": src_id, + "tgt_id": tgt_id, + "description": description, + "source_id": source_id, # for traceability + } + kg_instance.upsert_edge( src_id, tgt_id, edge_data={"source_id": source_id, "description": description}, ) + return edge_data async def _handle_kg_summary( self, diff --git a/graphgen/operators/build_kg/build_kg_service.py b/graphgen/operators/build_kg/build_kg_service.py index afeeefd7..b4155dde 100644 --- a/graphgen/operators/build_kg/build_kg_service.py +++ b/graphgen/operators/build_kg/build_kg_service.py @@ -29,7 +29,10 @@ def process(self, batch: pd.DataFrame) -> pd.DataFrame: # consume the chunks and build kg nodes, edges = self.build_kg(docs) - return pd.DataFrame([{"nodes": nodes, "edges": edges}]) + return pd.DataFrame( + [{"node": node, "edge": []} for node in nodes] + + [{"node": [], "edge": edge} for edge in edges] + ) def build_kg(self, chunks: List[Chunk]) -> tuple: """ @@ -42,8 +45,8 @@ def build_kg(self, chunks: List[Chunk]) -> tuple: if chunk.type in ("image", "video", "table", "formula") ] - nodes = {} - edges = {} + nodes = [] + edges = [] if len(text_chunks) == 0: logger.info("All text chunks are already in the storage") @@ -55,8 +58,8 @@ def build_kg(self, chunks: List[Chunk]) -> tuple: chunks=text_chunks, max_loop=self.max_loop, ) - nodes.update(text_nodes) - edges.update(text_edges) + nodes += text_nodes + edges += text_edges if len(mm_chunks) == 0: logger.info("All multi-modal chunks are already in the storage") else: @@ -66,8 +69,8 @@ def build_kg(self, chunks: List[Chunk]) -> tuple: kg_instance=self.graph_storage, chunks=mm_chunks, ) - nodes.update(mm_nodes) - edges.update(mm_edges) + nodes += mm_nodes + edges += mm_edges self.graph_storage.index_done_callback() logger.info("Knowledge graph building completed.") diff --git a/graphgen/operators/build_kg/build_mm_kg.py b/graphgen/operators/build_kg/build_mm_kg.py index 84bf12a6..e98c5428 100644 --- a/graphgen/operators/build_kg/build_mm_kg.py +++ b/graphgen/operators/build_kg/build_mm_kg.py @@ -37,13 +37,13 @@ def build_mm_kg( for k, v in e.items(): edges[tuple(sorted(k))].extend(v) - run_concurrent( + nodes = run_concurrent( lambda kv: mm_builder.merge_nodes(kv, kg_instance=kg_instance), list(nodes.items()), desc="Inserting entities into storage", ) - run_concurrent( + edges = run_concurrent( lambda kv: mm_builder.merge_edges(kv, kg_instance=kg_instance), list(edges.items()), desc="Inserting relationships into storage", diff --git a/graphgen/operators/build_kg/build_text_kg.py b/graphgen/operators/build_kg/build_text_kg.py index f44fcae8..2a7e1b03 100644 --- a/graphgen/operators/build_kg/build_text_kg.py +++ b/graphgen/operators/build_kg/build_text_kg.py @@ -39,13 +39,13 @@ def build_text_kg( for k, v in e.items(): edges[tuple(sorted(k))].extend(v) - run_concurrent( + nodes = run_concurrent( lambda kv: kg_builder.merge_nodes(kv, kg_instance=kg_instance), list(nodes.items()), desc="Inserting entities into storage", ) - run_concurrent( + edges = run_concurrent( lambda kv: kg_builder.merge_edges(kv, kg_instance=kg_instance), list(edges.items()), desc="Inserting relationships into storage", diff --git a/graphgen/operators/quiz/quiz_service.py b/graphgen/operators/quiz/quiz_service.py index ca3b4bfc..cabd4f77 100644 --- a/graphgen/operators/quiz/quiz_service.py +++ b/graphgen/operators/quiz/quiz_service.py @@ -1,5 +1,3 @@ -from collections.abc import Iterable - import pandas as pd from graphgen.bases import BaseGraphStorage, BaseKVStorage, BaseLLMWrapper, BaseOperator @@ -28,7 +26,7 @@ def __init__( ) self.generator = QuizGenerator(self.llm_client) - def process(self, batch: pd.DataFrame) -> Iterable[pd.DataFrame]: + def process(self, batch: pd.DataFrame) -> pd.DataFrame: data = batch.to_dict(orient="records") self.graph_storage.reload() return self.quiz(data) @@ -62,22 +60,22 @@ async def _process_single_quiz(self, item: tuple) -> dict | None: logger.error("Error when quizzing description %s: %s", item, e) return None - def quiz(self, batch) -> Iterable[pd.DataFrame]: + def quiz(self, batch) -> pd.DataFrame: """ Get all nodes and edges and quiz their descriptions using QuizGenerator. """ items = [] for item in batch: - nodes = item.get("nodes", []) - edges = item.get("edges", []) + node_data = item.get("node", []) + edge_data = item.get("edge", []) - for node_id, node_data in nodes.items(): - node_data = node_data[0] + if node_data: + node_id = node_data["entity_name"] desc = node_data["description"] items.append((node_id, desc)) - for edge_key, edge_data in edges.items(): - edge_data = edge_data[0] + if edge_data: + edge_key = (edge_data["src_id"], edge_data["tgt_id"]) desc = edge_data["description"] items.append((edge_key, desc)) From 38da2d055d40461cc82b978ba802fdd53abc01c0 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Tue, 13 Jan 2026 22:23:28 +0800 Subject: [PATCH 3/3] chore: upgrade ray to 2.53.0 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b0eb3966..2b51bc07 100644 --- a/requirements.txt +++ b/requirements.txt @@ -22,7 +22,7 @@ trafilatura aiohttp socksio pydantic -ray==2.52.1 +ray==2.53.0 pyarrow leidenalg