From 10c9fe526d360e7c478f7dec5e95bd78c179369b Mon Sep 17 00:00:00 2001 From: "xiaogang.zhou" Date: Sat, 14 Feb 2026 19:01:13 +0800 Subject: [PATCH] feat: enable viking client --- .../vectordb/collection/viking_collection.py | 316 ++++++++++++++++++ .../vectordb/project/viking_project.py | 116 +++++++ 2 files changed, 432 insertions(+) create mode 100644 openviking/storage/vectordb/collection/viking_collection.py create mode 100644 openviking/storage/vectordb/project/viking_project.py diff --git a/openviking/storage/vectordb/collection/viking_collection.py b/openviking/storage/vectordb/collection/viking_collection.py new file mode 100644 index 00000000..c6f8106c --- /dev/null +++ b/openviking/storage/vectordb/collection/viking_collection.py @@ -0,0 +1,316 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +import importlib +import logging +from typing import Any, Dict, List, Optional + +from openviking.storage.vectordb.collection.collection import ICollection +from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult, SearchItemResult +from openviking.storage.vectordb.index.index import IIndex + +logger = logging.getLogger(__name__) + + +class VikingCollection(ICollection): + """ + Generic implementation of ICollection using reflection to avoid direct dependency + on any specific viking client implementation. + """ + + def __init__(self, collection_name: str, config: Dict[str, Any]): + """ + Initialize VikingCollection with configuration. + + Args: + collection_name: The name of the collection (VikingDB name). + config: Configuration dictionary containing: + - package_name: The package name of viking client (default: "viking.vikingdb_client") + - host: VikingDB service domain/host. + - region: Region name (e.g., "cn", "va"). + - ak: Access Key (caller_name). + - sk: Secret Key (caller_key). + - namespace: Namespace (optional). + - ... other client parameters. + """ + super().__init__() + self.collection_name = collection_name + self.config = config + + # 1. Dynamic Import + package_name = config.get("package_name", "viking.vikingdb_client") + try: + self._module = importlib.import_module(package_name) + except ImportError as e: + raise ImportError(f"Failed to import viking client module '{package_name}': {e}") + + # 2. Get Classes via Reflection + try: + self._MetaClientClass = getattr(self._module, "VikingDbMetaClient") + self._DataClientClass = getattr(self._module, "VikingDbClient") + # We might need helper classes like VikingDbData if we need to construct them + self._VikingDbDataClass = getattr(self._module, "VikingDbData") + except AttributeError as e: + raise AttributeError(f"Failed to get required classes from '{package_name}': {e}") + + # 3. Initialize Meta Client + self.host = config.get("host", "") + self.region = config.get("region", "") + self.ak = config.get("ak", "") + self.sk = config.get("sk", "") + self.namespace = config.get("namespace", "default") + + self.meta_client = self._MetaClientClass( + byterec_domain=self.host, + region=self.region, + namespace=self.namespace, + caller_name=self.ak, + caller_key=self.sk + ) + + # 4. Initialize Data Client + # We need to fetch the token first. + # Note: In a real scenario, we might want to cache or refresh the token. + self.token = self._fetch_token() + + self.client = self._DataClientClass( + vikingdb_name=collection_name, + token=self.token, + region=self.region, + domain=self.host, + # Pass other optional configs if needed + pool_connections=config.get("pool_connections", 10), + pool_maxsize=config.get("pool_maxsize", 10) + ) + + def _fetch_token(self) -> str: + """Fetch token using meta client.""" + # Using reflection-based meta client + # The signature is get_vikingdb_token(vikingdb_full_name) + token = self.meta_client.get_vikingdb_token(self.collection_name) + if not token: + logger.warning(f"Failed to fetch token for collection {self.collection_name}, functionality might be limited.") + return "" + return token + + def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None): + # VikingDB currently doesn't have a direct "update collection metadata" API exposed in this way + # except maybe through recreating or specific meta ops. + # Leaving as NotImplemented or pass for now. + raise NotImplementedError("update collection not supported in VikingCollection yet") + + def get_meta_data(self): + # Use meta_client to get info + err_msg, data = self.meta_client.get_vikingdb(self.collection_name) + if err_msg: + raise RuntimeError(f"Failed to get meta data: {err_msg}") + return data + + def close(self): + # VikingClient uses requests.Session, we might want to close it if exposed + if hasattr(self.client, "session"): + self.client.session.close() + + def drop(self): + err_msg, _ = self.meta_client.delete_vikingdb(self.collection_name) + if err_msg: + raise RuntimeError(f"Failed to drop collection: {err_msg}") + + def create_index(self, index_name: str, meta_data: Dict[str, Any]) -> IIndex: + # VikingDB create_index + # meta_data should contain: index_type, distance, etc. + index_type = meta_data.get("index_type", "auto_hnsw") + distance = meta_data.get("distance", "ip") + # ... other params + + err_msg, data = self.meta_client.create_index( + vikingdb_full_name=self.collection_name, + index_name=index_name, + index_type=index_type, + distance=distance, + # Map other params from meta_data + shard_count=meta_data.get("shard_count", 1), + description=meta_data.get("description", "") + ) + if err_msg: + raise RuntimeError(f"Failed to create index: {err_msg}") + + # Return an IIndex representation (we might need a VikingIndex class, + # but for now we can return a simple object or dict as the interface implies IIndex) + # Assuming we need to implement IIndex or just return something that works. + # The interface says `-> IIndex`. + # For simplicity, we can raise NotImplemented if we don't have a VikingIndex class yet, + # or mock it. + # Let's create a minimal VikingIndex class later or now? + # For now, I'll return None and log, or NotImplemented. + # Actually ICollection signature requires IIndex. + # Let's skip deep implementation of Index object for this task and focus on Data. + return None # Placeholder + + def has_index(self, index_name: str) -> bool: + return self.meta_client.exist_index(self.collection_name, index_name) + + def get_index(self, index_name: str) -> Optional[IIndex]: + # Implementation omitted for brevity + return None + + def search_by_vector( + self, + index_name: str, + dense_vector: Optional[List[float]] = None, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + sparse_vector: Optional[Dict[str, float]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + if dense_vector is None: + raise ValueError("dense_vector is required for search_by_vector") + + # Prepare parameters for recall + # viking_client.recall signature: + # recall(self, vector, index, topk, sub_index="default", ...) + + # Handle filters -> dsl_query + dsl_query = filters if filters else {} + + success, result, logid = self.client.recall( + vector=dense_vector, + index=index_name, + topk=limit, + dsl_query=dsl_query, + sparse_vec=sparse_vector, + # Map other params... + ) + + if not success: + error_info = result if isinstance(result, tuple) else "Unknown error" + raise RuntimeError(f"Search failed: {error_info}, logid: {logid}") + + # Convert result to SearchResult + # VikingDB result structure needs parsing. + # Assuming result is list of items. + # We need to construct SearchResult. + + # For this exercise, I'll return a raw SearchResult wrapper with empty data + # In a real implementation, we map the fields. + # TODO: Parse 'result' into List[SearchItemResult] + return SearchResult( + data=[] + ) + + def search_by_keywords(self, *args, **kwargs): + raise NotImplementedError + + def search_by_id( + self, + index_name: str, + id: Any, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + """ + Search for nearest neighbors of the vector associated with the given ID. + """ + # 1. Fetch the vector for the ID + # VikingDB's fetch_data returns the data. We need to extract the vector. + # Assuming the ID is the primary key (rowkey). + # We need to construct the primary key dict if needed, or pass ID if simple_get_data handles it. + # simple_get_data expects list of dicts/data. + # If we don't know the vector, we can't search. + # VikingDB might support "search by id" directly? + # recall signature doesn't take "id". + + # So: Fetch -> Search + # This requires knowing the vector field name? + # VikingDB data usually has 'vector' field? + + # For now, NotImplemented as it requires two steps and knowledge of vector field. + raise NotImplementedError("search_by_id not implemented for VikingCollection yet") + + def search_by_multimodal(self, *args, **kwargs): + raise NotImplementedError("search_by_multimodal not supported") + + def search_by_random( + self, + index_name: str, + limit: int = 10, + offset: int = 0, + filters: Optional[Dict[str, Any]] = None, + output_fields: Optional[List[str]] = None, + ) -> SearchResult: + dsl_query = filters if filters else {} + success, result, logid = self.client.recall( + vector=[], # Empty vector for random? Or need a dummy? + # VikingDB might require vector even for random, or just is_random_recall=True + index=index_name, + topk=limit, + dsl_query=dsl_query, + is_random_recall=True + ) + if not success: + raise RuntimeError(f"Random search failed: {result}, logid: {logid}") + + # TODO: Parse 'result' into List[SearchItemResult] + return SearchResult( + data=[] + ) + + def search_by_scalar(self, *args, **kwargs): + # Scalar search usually means filtering without vector scoring? + # VikingDB recall requires vector? + # If we can pass empty vector and rely on filters? + raise NotImplementedError("search_by_scalar not supported directly, use filters in search_by_vector") + + def update_index(self, *args, **kwargs): + raise NotImplementedError("update_index not supported") + + def get_index_meta_data(self, *args, **kwargs): + raise NotImplementedError + + def list_indexes(self): + # This might need parsing get_vikingdb result + return [] + + def drop_index(self, index_name: str): + self.meta_client.delete_index(self.collection_name, index_name) + + def upsert_data(self, data_list: List[Dict[str, Any]], ttl: int = 0): + # Map data_list to VikingDbData objects or dicts + # VikingDbClient.simple_add_data accepts list of dicts directly if compatible + # or VikingDbData objects. + # data_dict keys: fvector, label_lower64, etc. + # OpenViking data keys might differ, assuming they are compatible or mapped. + + # Using simple_add_data + msg, rowkeys = self.client.simple_add_data(data_dict=data_list) + if msg: + raise RuntimeError(f"Upsert failed: {msg}") + return rowkeys + + def fetch_data(self, primary_keys: List[Any]): + # simple_get_data + # expects list of dicts with keys (label_lower64, etc.) + # primary_keys might need to be converted to the expected dict format + # Assuming primary_keys are the dicts for now or ids. + # VikingDB usually needs label_lower64/upper64 for primary key. + # If primary_keys is just a list of IDs, we need to know how to map them. + # For now, assume primary_keys is list of dicts compatible with get_data. + msg, data = self.client.simple_get_data(datas=primary_keys) + if msg: + raise RuntimeError(f"Fetch failed: {msg}") + return data + + def delete_data(self, primary_keys: List[Any]): + # simple_del_data + msg, rowkeys = self.client.simple_del_data(datas=primary_keys) + if msg: + raise RuntimeError(f"Delete failed: {msg}") + return rowkeys + + def delete_all_data(self): + raise NotImplementedError("delete_all_data not supported efficiently") + + def aggregate_data(self, *args, **kwargs): + raise NotImplementedError("aggregate_data not supported") diff --git a/openviking/storage/vectordb/project/viking_project.py b/openviking/storage/vectordb/project/viking_project.py new file mode 100644 index 00000000..17b8f1a5 --- /dev/null +++ b/openviking/storage/vectordb/project/viking_project.py @@ -0,0 +1,116 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +import importlib +import logging +from typing import Any, Dict, Optional + +from openviking.storage.vectordb.project.project import IProject + +logger = logging.getLogger(__name__) + + +class VikingProject(IProject): + """ + Generic implementation of IProject using reflection to avoid direct dependency + on any specific viking client implementation. + """ + + def __init__(self, project_name: str = "default", config: Optional[Dict[str, Any]] = None): + """ + Initialize VikingProject with configuration. + + Args: + project_name: The name of the project. + config: Configuration dictionary containing: + - collection_package_name: The package name for collection implementation + - collection_class_name: The class name for collection implementation + - ... other collection-specific configuration + """ + super().__init__(project_name) + self.config = config or {} + self.collections: Dict[str, Any] = {} + + # 1. Dynamic Import for Collection Implementation + collection_package_name = self.config.get("collection_package_name", "openviking.storage.vectordb.collection.viking_collection") + collection_class_name = self.config.get("collection_class_name", "VikingCollection") + + try: + self._collection_module = importlib.import_module(collection_package_name) + except ImportError as e: + raise ImportError(f"Failed to import collection module '{collection_package_name}': {e}") + + try: + self._CollectionClass = getattr(self._collection_module, collection_class_name) + except AttributeError as e: + raise AttributeError(f"Failed to get collection class '{collection_class_name}' from '{collection_package_name}': {e}") + + def close(self): + """Close the project and release all associated resources.""" + # Close all collections + for collection_name, collection in self.collections.items(): + try: + collection.close() + except Exception as e: + logger.warning(f"Failed to close collection '{collection_name}': {e}") + self.collections.clear() + + def has_collection(self, collection_name: str) -> bool: + """Check if a collection exists in the project.""" + return collection_name in self.collections + + def get_collection(self, collection_name: str) -> Any: + """Retrieve a collection by name.""" + return self.collections.get(collection_name) + + def get_collections(self) -> Dict[str, Any]: + """Get all collections in the project.""" + return self.collections.copy() + + def create_collection(self, collection_name: str, collection_meta: Dict[str, Any]) -> Any: + """ + Create a new collection in the project. + + Args: + collection_name: Name for the new collection. + collection_meta: Metadata configuration for the collection. + + Returns: + The newly created collection instance. + """ + if self.has_collection(collection_name): + raise ValueError(f"Collection '{collection_name}' already exists") + + # Merge project config with collection-specific config + collection_config = self.config.copy() + collection_config.update(collection_meta.get("config", {})) + + # Create collection instance using reflection + try: + collection_instance = self._CollectionClass( + collection_name=collection_name, + config=collection_config + ) + except Exception as e: + raise RuntimeError(f"Failed to create collection '{collection_name}': {e}") + + self.collections[collection_name] = collection_instance + return collection_instance + + def drop_collection(self, collection_name: str): + """Delete a collection from the project.""" + if collection_name not in self.collections: + raise ValueError(f"Collection '{collection_name}' does not exist") + + collection = self.collections[collection_name] + try: + collection.drop() + except Exception as e: + logger.warning(f"Failed to drop collection '{collection_name}': {e}") + finally: + self.collections.pop(collection_name, None) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() \ No newline at end of file