Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 316 additions & 0 deletions openviking/storage/vectordb/collection/viking_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
import importlib
import logging
from typing import Any, Dict, List, Optional

from openviking.storage.vectordb.collection.collection import ICollection
from openviking.storage.vectordb.collection.result import AggregateResult, SearchResult, SearchItemResult
from openviking.storage.vectordb.index.index import IIndex

logger = logging.getLogger(__name__)


class VikingCollection(ICollection):
"""
Generic implementation of ICollection using reflection to avoid direct dependency
on any specific viking client implementation.
"""

def __init__(self, collection_name: str, config: Dict[str, Any]):
"""
Initialize VikingCollection with configuration.

Args:
collection_name: The name of the collection (VikingDB name).
config: Configuration dictionary containing:
- package_name: The package name of viking client (default: "viking.vikingdb_client")
- host: VikingDB service domain/host.
- region: Region name (e.g., "cn", "va").
- ak: Access Key (caller_name).
- sk: Secret Key (caller_key).
- namespace: Namespace (optional).
- ... other client parameters.
"""
super().__init__()
self.collection_name = collection_name
self.config = config

# 1. Dynamic Import
package_name = config.get("package_name", "viking.vikingdb_client")
try:
self._module = importlib.import_module(package_name)
except ImportError as e:
raise ImportError(f"Failed to import viking client module '{package_name}': {e}")

# 2. Get Classes via Reflection
try:
self._MetaClientClass = getattr(self._module, "VikingDbMetaClient")
self._DataClientClass = getattr(self._module, "VikingDbClient")
# We might need helper classes like VikingDbData if we need to construct them
self._VikingDbDataClass = getattr(self._module, "VikingDbData")
except AttributeError as e:
raise AttributeError(f"Failed to get required classes from '{package_name}': {e}")

# 3. Initialize Meta Client
self.host = config.get("host", "")
self.region = config.get("region", "")
self.ak = config.get("ak", "")
self.sk = config.get("sk", "")
self.namespace = config.get("namespace", "default")

self.meta_client = self._MetaClientClass(
byterec_domain=self.host,
region=self.region,
namespace=self.namespace,
caller_name=self.ak,
caller_key=self.sk
)

# 4. Initialize Data Client
# We need to fetch the token first.
# Note: In a real scenario, we might want to cache or refresh the token.
self.token = self._fetch_token()

self.client = self._DataClientClass(
vikingdb_name=collection_name,
token=self.token,
region=self.region,
domain=self.host,
# Pass other optional configs if needed
pool_connections=config.get("pool_connections", 10),
pool_maxsize=config.get("pool_maxsize", 10)
)

def _fetch_token(self) -> str:
"""Fetch token using meta client."""
# Using reflection-based meta client
# The signature is get_vikingdb_token(vikingdb_full_name)
token = self.meta_client.get_vikingdb_token(self.collection_name)
if not token:
logger.warning(f"Failed to fetch token for collection {self.collection_name}, functionality might be limited.")
return ""
return token

def update(self, fields: Optional[Dict[str, Any]] = None, description: Optional[str] = None):
# VikingDB currently doesn't have a direct "update collection metadata" API exposed in this way
# except maybe through recreating or specific meta ops.
# Leaving as NotImplemented or pass for now.
raise NotImplementedError("update collection not supported in VikingCollection yet")

def get_meta_data(self):
# Use meta_client to get info
err_msg, data = self.meta_client.get_vikingdb(self.collection_name)
if err_msg:
raise RuntimeError(f"Failed to get meta data: {err_msg}")
return data

def close(self):
# VikingClient uses requests.Session, we might want to close it if exposed
if hasattr(self.client, "session"):
self.client.session.close()

def drop(self):
err_msg, _ = self.meta_client.delete_vikingdb(self.collection_name)
if err_msg:
raise RuntimeError(f"Failed to drop collection: {err_msg}")

def create_index(self, index_name: str, meta_data: Dict[str, Any]) -> IIndex:
# VikingDB create_index
# meta_data should contain: index_type, distance, etc.
index_type = meta_data.get("index_type", "auto_hnsw")
distance = meta_data.get("distance", "ip")
# ... other params

err_msg, data = self.meta_client.create_index(
vikingdb_full_name=self.collection_name,
index_name=index_name,
index_type=index_type,
distance=distance,
# Map other params from meta_data
shard_count=meta_data.get("shard_count", 1),
description=meta_data.get("description", "")
)
if err_msg:
raise RuntimeError(f"Failed to create index: {err_msg}")

# Return an IIndex representation (we might need a VikingIndex class,
# but for now we can return a simple object or dict as the interface implies IIndex)
# Assuming we need to implement IIndex or just return something that works.
# The interface says `-> IIndex`.
# For simplicity, we can raise NotImplemented if we don't have a VikingIndex class yet,
# or mock it.
# Let's create a minimal VikingIndex class later or now?
# For now, I'll return None and log, or NotImplemented.
# Actually ICollection signature requires IIndex.
# Let's skip deep implementation of Index object for this task and focus on Data.
return None # Placeholder

def has_index(self, index_name: str) -> bool:
return self.meta_client.exist_index(self.collection_name, index_name)

def get_index(self, index_name: str) -> Optional[IIndex]:
# Implementation omitted for brevity
return None

def search_by_vector(
self,
index_name: str,
dense_vector: Optional[List[float]] = None,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
sparse_vector: Optional[Dict[str, float]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
if dense_vector is None:
raise ValueError("dense_vector is required for search_by_vector")

# Prepare parameters for recall
# viking_client.recall signature:
# recall(self, vector, index, topk, sub_index="default", ...)

# Handle filters -> dsl_query
dsl_query = filters if filters else {}

success, result, logid = self.client.recall(
vector=dense_vector,
index=index_name,
topk=limit,
dsl_query=dsl_query,
sparse_vec=sparse_vector,
# Map other params...
)

if not success:
error_info = result if isinstance(result, tuple) else "Unknown error"
raise RuntimeError(f"Search failed: {error_info}, logid: {logid}")

# Convert result to SearchResult
# VikingDB result structure needs parsing.
# Assuming result is list of items.
# We need to construct SearchResult.

# For this exercise, I'll return a raw SearchResult wrapper with empty data
# In a real implementation, we map the fields.
# TODO: Parse 'result' into List[SearchItemResult]
return SearchResult(
data=[]
)

def search_by_keywords(self, *args, **kwargs):
raise NotImplementedError

def search_by_id(
self,
index_name: str,
id: Any,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
"""
Search for nearest neighbors of the vector associated with the given ID.
"""
# 1. Fetch the vector for the ID
# VikingDB's fetch_data returns the data. We need to extract the vector.
# Assuming the ID is the primary key (rowkey).
# We need to construct the primary key dict if needed, or pass ID if simple_get_data handles it.
# simple_get_data expects list of dicts/data.
# If we don't know the vector, we can't search.
# VikingDB might support "search by id" directly?
# recall signature doesn't take "id".

# So: Fetch -> Search
# This requires knowing the vector field name?
# VikingDB data usually has 'vector' field?

# For now, NotImplemented as it requires two steps and knowledge of vector field.
raise NotImplementedError("search_by_id not implemented for VikingCollection yet")

def search_by_multimodal(self, *args, **kwargs):
raise NotImplementedError("search_by_multimodal not supported")

def search_by_random(
self,
index_name: str,
limit: int = 10,
offset: int = 0,
filters: Optional[Dict[str, Any]] = None,
output_fields: Optional[List[str]] = None,
) -> SearchResult:
dsl_query = filters if filters else {}
success, result, logid = self.client.recall(
vector=[], # Empty vector for random? Or need a dummy?
# VikingDB might require vector even for random, or just is_random_recall=True
index=index_name,
topk=limit,
dsl_query=dsl_query,
is_random_recall=True
)
if not success:
raise RuntimeError(f"Random search failed: {result}, logid: {logid}")

# TODO: Parse 'result' into List[SearchItemResult]
return SearchResult(
data=[]
)

def search_by_scalar(self, *args, **kwargs):
# Scalar search usually means filtering without vector scoring?
# VikingDB recall requires vector?
# If we can pass empty vector and rely on filters?
raise NotImplementedError("search_by_scalar not supported directly, use filters in search_by_vector")

def update_index(self, *args, **kwargs):
raise NotImplementedError("update_index not supported")

def get_index_meta_data(self, *args, **kwargs):
raise NotImplementedError

def list_indexes(self):
# This might need parsing get_vikingdb result
return []

def drop_index(self, index_name: str):
self.meta_client.delete_index(self.collection_name, index_name)

def upsert_data(self, data_list: List[Dict[str, Any]], ttl: int = 0):
# Map data_list to VikingDbData objects or dicts
# VikingDbClient.simple_add_data accepts list of dicts directly if compatible
# or VikingDbData objects.
# data_dict keys: fvector, label_lower64, etc.
# OpenViking data keys might differ, assuming they are compatible or mapped.

# Using simple_add_data
msg, rowkeys = self.client.simple_add_data(data_dict=data_list)
if msg:
raise RuntimeError(f"Upsert failed: {msg}")
return rowkeys

def fetch_data(self, primary_keys: List[Any]):
# simple_get_data
# expects list of dicts with keys (label_lower64, etc.)
# primary_keys might need to be converted to the expected dict format
# Assuming primary_keys are the dicts for now or ids.
# VikingDB usually needs label_lower64/upper64 for primary key.
# If primary_keys is just a list of IDs, we need to know how to map them.
# For now, assume primary_keys is list of dicts compatible with get_data.
msg, data = self.client.simple_get_data(datas=primary_keys)
if msg:
raise RuntimeError(f"Fetch failed: {msg}")
return data

def delete_data(self, primary_keys: List[Any]):
# simple_del_data
msg, rowkeys = self.client.simple_del_data(datas=primary_keys)
if msg:
raise RuntimeError(f"Delete failed: {msg}")
return rowkeys

def delete_all_data(self):
raise NotImplementedError("delete_all_data not supported efficiently")

def aggregate_data(self, *args, **kwargs):
raise NotImplementedError("aggregate_data not supported")
Loading