From c2327a5428ffa3d9896002c42dbb548528df9f43 Mon Sep 17 00:00:00 2001 From: peopleig Date: Fri, 6 Feb 2026 18:20:23 +0530 Subject: [PATCH] Add snafu error handling --- Cargo.lock | 26 +++++ Cargo.toml | 1 + crates/api/Cargo.toml | 1 + crates/api/src/error.rs | 45 +++++++++ crates/api/src/lib.rs | 72 +++++++++++--- crates/defs/src/error.rs | 26 ----- crates/grpc/Cargo.toml | 1 + crates/grpc/src/errors.rs | 151 ++++++++++++++++++++++++++++-- crates/grpc/src/service.rs | 9 +- crates/http/src/handler.rs | 70 ++++++++++---- crates/index/Cargo.toml | 1 + crates/index/src/error.rs | 48 ++++++++++ crates/index/src/flat.rs | 14 ++- crates/index/src/hnsw/index.rs | 21 +++-- crates/index/src/hnsw/search.rs | 12 +-- crates/index/src/kd_tree/index.rs | 18 ++-- crates/index/src/kd_tree/tests.rs | 13 ++- crates/index/src/lib.rs | 12 +-- crates/server/Cargo.toml | 1 + crates/server/src/config.rs | 69 ++++++++------ crates/storage/Cargo.toml | 1 + crates/storage/src/error.rs | 32 +++++++ crates/storage/src/in_memory.rs | 19 ++-- crates/storage/src/lib.rs | 25 ++--- crates/storage/src/rocks_db.rs | 86 ++++++++++------- 25 files changed, 579 insertions(+), 195 deletions(-) create mode 100644 crates/api/src/error.rs create mode 100644 crates/index/src/error.rs create mode 100644 crates/storage/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 7b9a6a4..e19cfa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -44,6 +44,7 @@ version = "0.1.0" dependencies = [ "defs", "index", + "snafu", "storage", "tempfile", "uuid", @@ -569,6 +570,7 @@ dependencies = [ "index", "prost", "prost-types", + "snafu", "storage", "tempfile", "tokio", @@ -891,6 +893,7 @@ version = "0.1.0" dependencies = [ "defs", "rand", + "snafu", "uuid", ] @@ -1810,6 +1813,7 @@ dependencies = [ "prost", "serde", "serde_json", + "snafu", "storage", "tokio", "tokio-stream", @@ -1877,6 +1881,27 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "snafu" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "socket2" version = "0.6.1" @@ -1916,6 +1941,7 @@ dependencies = [ "bincode", "defs", "rocksdb", + "snafu", "tempfile", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 2958714..76c1c7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ reqwest = { version = "0.12", features = ["json", "blocking", "multipart"] } rocksdb = "0.21.0" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.145" +snafu = "0.8.9" tempfile = "3.23.0" tokio = { version = "1.47.1", features = ["full"] } tokio-stream = "0.1.17" diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index 8ade9d8..287f94c 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true [dependencies] defs.workspace = true index.workspace = true +snafu.workspace = true storage.workspace = true tempfile.workspace = true uuid.workspace = true diff --git a/crates/api/src/error.rs b/crates/api/src/error.rs new file mode 100644 index 0000000..622e58b --- /dev/null +++ b/crates/api/src/error.rs @@ -0,0 +1,45 @@ +use defs::{Dimension, PointId}; +use snafu::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum ApiError { + #[snafu(display("Vector dimension mismatch: expected {expected}, got {got}"))] + DimensionMismatch { expected: Dimension, got: Dimension }, + + #[snafu(display("Failed to acquire lock on vector index"))] + LockError, + + #[snafu(display("Storage error: {source}"))] + Storage { + source: storage::error::StorageError, + }, + + #[snafu(display("Index error: {source}"))] + Index { source: index::error::IndexError }, + + #[snafu(display("Point {id} not found"))] + PointNotFound { id: PointId }, + + #[snafu(display("Invalid search limit: {limit}"))] + InvalidSearchLimit { limit: usize }, + + #[snafu(display("Failed to initialize database: {reason}"))] + InitializationFailed { reason: String }, +} + +pub type Result = std::result::Result; + +// Automatic conversion from StorageError to ApiError +impl From for ApiError { + fn from(source: storage::error::StorageError) -> Self { + ApiError::Storage { source } + } +} + +// Automatic conversion from IndexError to ApiError +impl From for ApiError { + fn from(source: index::error::IndexError) -> Self { + ApiError::Index { source } + } +} diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 6ca6849..16e9dab 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -1,4 +1,4 @@ -use defs::{DbError, Dimension, IndexedVector, Similarity}; +use defs::{Dimension, IndexedVector, Similarity}; use defs::{DenseVector, Payload, Point, PointId}; use index::hnsw::HnswIndex; @@ -13,6 +13,9 @@ use storage::{StorageEngine, StorageType, VectorPage}; use uuid::Uuid; +pub mod error; +pub use error::{ApiError, Result}; + // static NEXT_ID: AtomicU64 = AtomicU64::new(1); // fn generate_point_id() -> u64 { @@ -43,9 +46,12 @@ impl VectorDb { } //TODO: Make this an atomic operation - pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result { + pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result { if vector.len() != self.dimension { - return Err(DbError::DimensionMismatch); + return Err(ApiError::DimensionMismatch { + expected: self.dimension, + got: vector.len(), + }); } // Generate a new point id let point_id = generate_point_id(); @@ -53,7 +59,7 @@ impl VectorDb { .insert_point(point_id, Some(vector.clone()), Some(payload))?; // Get write lock on the index - let mut index = self.index.write().map_err(|_| DbError::LockError)?; + let mut index = self.index.write().map_err(|_| ApiError::LockError)?; index.insert(IndexedVector { vector, id: point_id, @@ -63,16 +69,16 @@ impl VectorDb { } //TODO: Make this an atomic operation - pub fn delete(&self, id: PointId) -> Result { + pub fn delete(&self, id: PointId) -> Result { // Remove from storage self.storage.delete_point(id)?; // Remove from index - let mut index = self.index.write().map_err(|_| DbError::LockError)?; + let mut index = self.index.write().map_err(|_| ApiError::LockError)?; let point_found = index.delete(id)?; Ok(point_found) } - pub fn get(&self, id: PointId) -> Result, DbError> { + pub fn get(&self, id: PointId) -> Result> { // Search for the Point with given id in storage let payload = self.storage.get_payload(id)?; let vector = self.storage.get_vector(id)?; @@ -92,9 +98,22 @@ impl VectorDb { query: DenseVector, similarity: Similarity, limit: usize, - ) -> Result, DbError> { + ) -> Result> { + // Validate search limit + if limit == 0 { + return Err(ApiError::InvalidSearchLimit { limit }); + } + + // Validate query dimension + if query.len() != self.dimension { + return Err(ApiError::DimensionMismatch { + expected: self.dimension, + got: query.len(), + }); + } + // Use vector index to find similar vectors - let index = self.index.read().map_err(|_| DbError::LockError)?; + let index = self.index.read().map_err(|_| ApiError::LockError)?; //TODO: Add feat of returning similarity scores in the search let vectors = index.search(query, similarity, limit)?; @@ -102,18 +121,19 @@ impl VectorDb { Ok(vectors) } - pub fn list(&self, offset: PointId, limit: usize) -> Result, DbError> { - self.storage.list_vectors(offset, limit) + pub fn list(&self, offset: PointId, limit: usize) -> Result> { + let page = self.storage.list_vectors(offset, limit)?; + Ok(page) } // populates the current index with vectors from the storage - pub fn build_index(&self) -> Result { + pub fn build_index(&self) -> Result { // start from the minimal UUID and fetch in bounded batches and insert let mut offset = Uuid::nil(); let page_size: usize = 1000; let mut inserted: usize = 0; - let mut index = self.index.write().map_err(|_| DbError::LockError)?; + let mut index = self.index.write().map_err(|_| ApiError::LockError)?; while let Some((batch, next_offset)) = self.storage.list_vectors(offset, page_size)? { if batch.is_empty() || next_offset == offset { @@ -141,7 +161,7 @@ pub struct DbConfig { pub similarity: Similarity, } -pub fn init_api(config: DbConfig) -> Result { +pub fn init_api(config: DbConfig) -> Result { // Initialize the storage engine let storage = match config.storage_type { StorageType::RocksDb => Arc::new(RocksDbStorage::new(config.data_path)?), @@ -230,7 +250,13 @@ mod tests { // Insert vector of dimension 2 != 3 let res2 = db.insert(v2, payload); assert!(res2.is_err()); - assert_eq!(res2.unwrap_err(), DbError::DimensionMismatch); + match res2.unwrap_err() { + ApiError::DimensionMismatch { expected, got } => { + assert_eq!(expected, 3); + assert_eq!(got, 2); + } + other => panic!("Expected DimensionMismatch, got: {:?}", other), + } } #[test] @@ -312,6 +338,22 @@ mod tests { assert_eq!(results.len(), 3); } + #[test] + fn test_search_zero_limit() { + let (db, _temp_dir) = create_test_db(); + + let query = vec![1.0, 2.0, 3.0]; + let result = db.search(query, Similarity::Cosine, 0); + + assert!(result.is_err()); + match result.unwrap_err() { + ApiError::InvalidSearchLimit { limit } => { + assert_eq!(limit, 0); + } + other => panic!("Expected InvalidSearchLimit, got: {:?}", other), + } + } + #[test] fn test_empty_database() { let (db, _temp_dir) = create_test_db(); diff --git a/crates/defs/src/error.rs b/crates/defs/src/error.rs index 89116ce..13f0b60 100644 --- a/crates/defs/src/error.rs +++ b/crates/defs/src/error.rs @@ -1,22 +1,5 @@ use std::io; -use crate::{Dimension, PointId}; -#[derive(Debug, PartialEq, Eq)] -pub enum DbError { - ParseError, - StorageError(String), - SerializationError(String), - DeserializationError, - IndexError(String), - LockError, - IndexInitError, //TODO: Change this - UnsupportedSimilarity, - DimensionMismatch, - InvalidDimension { expected: Dimension, got: Dimension }, - PointAlreadyExists { id: PointId }, - PointNotFound { id: PointId }, -} - #[derive(Debug)] pub enum ServerError { Bind(io::Error), @@ -25,17 +8,8 @@ pub enum ServerError { #[derive(Debug)] pub enum AppError { - DbError(DbError), ServerError(ServerError), } -impl std::fmt::Display for DbError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - -impl std::error::Error for DbError {} - // Error type for server pub type BoxError = Box; diff --git a/crates/grpc/Cargo.toml b/crates/grpc/Cargo.toml index e0bf61e..3ec390a 100644 --- a/crates/grpc/Cargo.toml +++ b/crates/grpc/Cargo.toml @@ -12,6 +12,7 @@ defs.workspace = true index.workspace = true prost.workspace = true prost-types.workspace = true +snafu.workspace = true storage.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/crates/grpc/src/errors.rs b/crates/grpc/src/errors.rs index c7342bb..ce8fd38 100644 --- a/crates/grpc/src/errors.rs +++ b/crates/grpc/src/errors.rs @@ -1,14 +1,147 @@ -#[derive(Debug)] -pub enum ConfigError { - MissingRequiredEnvVar(String), - InvalidDimension, - InvalidDataPath, +use snafu::prelude::*; +use tonic::Status; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum GrpcError { + #[snafu(display("Missing required environment variable: {var}"))] + MissingRequiredEnvVar { var: String }, + + #[snafu(display("Invalid dimension value: expected {expected}, got {got}"))] + InvalidDimension { expected: usize, got: usize }, + + #[snafu(display("Invalid data path: {source}"))] + InvalidDataPath { source: std::io::Error }, + + #[snafu(display("IO error: {source}"))] + Io { source: std::io::Error }, + + #[snafu(display("Not found: {message}"))] + NotFound { message: String }, + + #[snafu(display("Invalid argument: {message}"))] + InvalidArgument { message: String }, + + #[snafu(display("Already exists: {message}"))] + AlreadyExists { message: String }, + + #[snafu(display("Failed precondition: {message}"))] + FailedPrecondition { message: String }, + + #[snafu(display("Internal error: {message}"))] + Internal { message: String }, } -impl std::error::Error for ConfigError {} +pub type Result = std::result::Result; + +impl From for GrpcError { + fn from(e: api::error::ApiError) -> Self { + use api::error::ApiError; + match e { + ApiError::DimensionMismatch { expected, got } => GrpcError::InvalidArgument { + message: format!("dimension mismatch: expected {}, got {}", expected, got), + }, + ApiError::LockError => GrpcError::Internal { + message: "failed to acquire lock on index".to_string(), + }, + ApiError::Storage { source } => GrpcError::Internal { + message: format!("storage error: {:?}", source), + }, + ApiError::Index { source } => GrpcError::Internal { + message: format!("index error: {:?}", source), + }, + ApiError::PointNotFound { id } => GrpcError::NotFound { + message: format!("point not found: {}", id), + }, + ApiError::InvalidSearchLimit { limit } => GrpcError::InvalidArgument { + message: format!("invalid search limit: {}", limit), + }, + ApiError::InitializationFailed { reason } => GrpcError::Internal { message: reason }, + } + } +} + +impl From for GrpcError { + fn from(e: index::error::IndexError) -> Self { + use index::error::IndexError; + match e { + IndexError::DimensionMismatch { expected, got } => GrpcError::InvalidArgument { + message: format!("dimension mismatch: expected {}, got {}", expected, got), + }, + IndexError::NotInitialized => GrpcError::FailedPrecondition { + message: "index not initialized".to_string(), + }, + IndexError::UnsupportedSimilarity { metric } => GrpcError::InvalidArgument { + message: format!("unsupported similarity metric: {}", metric), + }, + IndexError::PointNotFound { id } => GrpcError::NotFound { + message: format!("point not found: {}", id), + }, + IndexError::PointAlreadyExists { id } => GrpcError::AlreadyExists { + message: format!("point already exists: {}", id), + }, + IndexError::HnswError { message } => GrpcError::Internal { message }, + IndexError::InvalidParameter { parameter, reason } => GrpcError::InvalidArgument { + message: format!("invalid parameter {}: {}", parameter, reason), + }, + IndexError::SearchFailed { reason } => GrpcError::Internal { message: reason }, + IndexError::EmptyIndex => GrpcError::FailedPrecondition { + message: "index is empty".to_string(), + }, + IndexError::InvalidSearchLimit { limit } => GrpcError::InvalidArgument { + message: format!("invalid search limit: {}", limit), + }, + } + } +} + +impl From for GrpcError { + fn from(e: storage::error::StorageError) -> Self { + use storage::error::StorageError; + match e { + StorageError::RocksDbOpen { path, source: _ } => GrpcError::Internal { + message: format!("failed to open storage at {}", path), + }, + StorageError::RocksDbRead { id, source: _ } => GrpcError::Internal { + message: format!("failed to read point {} from storage", id), + }, + StorageError::RocksDbWrite { id, source: _ } => GrpcError::Internal { + message: format!("failed to write point {} to storage", id), + }, + StorageError::RocksDbDelete { id, source: _ } => GrpcError::Internal { + message: format!("failed to delete point {} from storage", id), + }, + StorageError::RocksDbIteration { source: _ } => GrpcError::Internal { + message: "failed to iterate over storage".to_string(), + }, + StorageError::Serialization { id, source: _ } => GrpcError::Internal { + message: format!("failed to serialize point {}", id), + }, + StorageError::Deserialization { id, source: _ } => GrpcError::Internal { + message: format!("failed to deserialize point {}", id), + }, + } + } +} -impl std::fmt::Display for ConfigError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) +// Convert our grpc-local error enum into a tonic::Status so service handlers +// can return that directly to clients. +impl From for Status { + fn from(e: GrpcError) -> Self { + match e { + GrpcError::MissingRequiredEnvVar { var } => { + Status::failed_precondition(format!("missing required env var: {}", var)) + } + GrpcError::InvalidDimension { .. } => Status::invalid_argument(e.to_string()), + GrpcError::InvalidDataPath { source } => { + Status::invalid_argument(format!("invalid data path: {}", source)) + } + GrpcError::Io { source } => Status::internal(format!("io error: {}", source)), + GrpcError::NotFound { message } => Status::not_found(message), + GrpcError::InvalidArgument { message } => Status::invalid_argument(message), + GrpcError::AlreadyExists { message } => Status::already_exists(message), + GrpcError::FailedPrecondition { message } => Status::failed_precondition(message), + GrpcError::Internal { message } => Status::internal(message), + } } } diff --git a/crates/grpc/src/service.rs b/crates/grpc/src/service.rs index 1efab3b..16fcc6d 100644 --- a/crates/grpc/src/service.rs +++ b/crates/grpc/src/service.rs @@ -62,8 +62,7 @@ impl VectorDb for VectorDBService { }, ); - let res = - point_id.map_err(|e| Status::internal(format!("failed to insert vector: {:?}", e)))?; + let res = point_id.map_err(|e| Status::from(crate::errors::GrpcError::from(e)))?; Ok(Response::new(PointId { id: Some(Uuid { @@ -81,7 +80,7 @@ impl VectorDb for VectorDBService { let point_opt = self .vector_db .get(UuidCrate::from_str(&point_id).unwrap()) - .map_err(|e| Status::aborted(format!("point not found {:?}", e)))?; + .map_err(|e| Status::from(crate::errors::GrpcError::from(e)))?; // return error if not found let point = point_opt.ok_or(Status::not_found(format!("point not found: {}", point_id)))?; @@ -131,7 +130,7 @@ impl VectorDb for VectorDBService { let result_point_ids = self .vector_db .search(query_vect.values, *similarity, limit as usize) - .map_err(|_| Status::internal("Internal server error"))?; + .map_err(|e| Status::from(crate::errors::GrpcError::from(e)))?; // create a mapped vector of PointIds let result = result_point_ids @@ -164,7 +163,7 @@ impl VectorDb for VectorDBService { Err(Status::not_found("Point not found")) } } - Err(_) => Err(Status::internal("Error deleting point")), + Err(e) => Err(Status::from(crate::errors::GrpcError::from(e))), } } } diff --git a/crates/http/src/handler.rs b/crates/http/src/handler.rs index 3b2cab0..ee49e46 100644 --- a/crates/http/src/handler.rs +++ b/crates/http/src/handler.rs @@ -1,10 +1,13 @@ +use api::error::ApiError; use axum::{ Json, extract::{Path, State}, http::StatusCode, }; use defs::{DenseVector, Payload, Point, PointId, Similarity}; +use index::error::IndexError; use serde::{Deserialize, Serialize}; +use storage::error::StorageError; use tracing::error; use crate::AppState; @@ -37,13 +40,9 @@ pub async fn insert_point_handler( let response = InsertResponse { point_id }; Ok((StatusCode::CREATED, Json(response))) } - Err(e) => { error!("Failed to insert point: {:?}", e); - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to insert point".to_string(), - )) + Err(api_error_to_response(&e)) } } } @@ -55,13 +54,9 @@ pub async fn get_point_handler( match app_state.db.get(point_id) { Ok(Some(point)) => Ok(Json(point)), Ok(None) => Err((StatusCode::NOT_FOUND, "Point not found".to_string())), - Err(e) => { error!("Failed to get point {}: {:?}", point_id, e); - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Database error".to_string(), - )) + Err(api_error_to_response(&e)) } } } @@ -72,13 +67,9 @@ pub async fn delete_point_handler( ) -> Result { match app_state.db.delete(point_id) { Ok(_) => Ok(StatusCode::NO_CONTENT), - Err(e) => { error!("Failed to delete point {}: {:?}", point_id, e); - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Database error".to_string(), - )) + Err(api_error_to_response(&e)) } } } @@ -109,10 +100,51 @@ pub async fn search_points_handler( } Err(e) => { error!("Failed to search points: {:?}", e); - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Database error during search".to_string(), - )) + Err(api_error_to_response(&e)) + } + } +} + +/// Map `ApiError` into an HTTP `(StatusCode, String)` response. +fn api_error_to_response(err: &ApiError) -> (StatusCode, String) { + match err { + ApiError::PointNotFound { .. } => (StatusCode::NOT_FOUND, err.to_string()), + ApiError::DimensionMismatch { .. } => (StatusCode::BAD_REQUEST, err.to_string()), + ApiError::InvalidSearchLimit { .. } => (StatusCode::BAD_REQUEST, err.to_string()), + ApiError::LockError => ( + StatusCode::SERVICE_UNAVAILABLE, + "Server is busy, try again".to_string(), + ), + ApiError::InitializationFailed { .. } => { + (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) } + ApiError::Storage { source } => match source { + // storage errors are internal IO/serialization issues + StorageError::RocksDbOpen { .. } + | StorageError::RocksDbRead { .. } + | StorageError::RocksDbWrite { .. } + | StorageError::RocksDbDelete { .. } + | StorageError::RocksDbIteration { .. } + | StorageError::Serialization { .. } + | StorageError::Deserialization { .. } => { + (StatusCode::INTERNAL_SERVER_ERROR, source.to_string()) + } + }, + ApiError::Index { source } => match source { + IndexError::PointNotFound { .. } => (StatusCode::NOT_FOUND, source.to_string()), + IndexError::PointAlreadyExists { .. } => (StatusCode::CONFLICT, source.to_string()), + IndexError::InvalidSearchLimit { .. } => (StatusCode::BAD_REQUEST, source.to_string()), + IndexError::DimensionMismatch { .. } => (StatusCode::BAD_REQUEST, source.to_string()), + IndexError::UnsupportedSimilarity { .. } => { + (StatusCode::BAD_REQUEST, source.to_string()) + } + IndexError::InvalidParameter { .. } => (StatusCode::BAD_REQUEST, source.to_string()), + IndexError::NotInitialized | IndexError::EmptyIndex => { + (StatusCode::FAILED_DEPENDENCY, source.to_string()) + } + IndexError::HnswError { .. } | IndexError::SearchFailed { .. } => { + (StatusCode::INTERNAL_SERVER_ERROR, source.to_string()) + } + }, } } diff --git a/crates/index/Cargo.toml b/crates/index/Cargo.toml index 8fe2733..fda6a82 100644 --- a/crates/index/Cargo.toml +++ b/crates/index/Cargo.toml @@ -9,4 +9,5 @@ license.workspace = true [dependencies] defs.workspace = true rand.workspace = true +snafu.workspace = true uuid.workspace = true diff --git a/crates/index/src/error.rs b/crates/index/src/error.rs new file mode 100644 index 0000000..e36755c --- /dev/null +++ b/crates/index/src/error.rs @@ -0,0 +1,48 @@ +use defs::{Dimension, PointId}; +use snafu::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum IndexError { + /// Dimension mismatch when inserting vectors into index + #[snafu(display("Vector dimension mismatch in index: expected {expected}, got {got}"))] + DimensionMismatch { expected: Dimension, got: Dimension }, + + /// Index not initialized + #[snafu(display("Index has not been initialized"))] + NotInitialized, + + /// Unsupported similarity metric + #[snafu(display("Unsupported similarity metric: {metric}"))] + UnsupportedSimilarity { metric: String }, + + /// Point not found in index + #[snafu(display("Point {id} not found in index"))] + PointNotFound { id: PointId }, + + /// Point already exists in index + #[snafu(display("Point {id} already exists in index"))] + PointAlreadyExists { id: PointId }, + + /// HNSW-specific errors + #[snafu(display("HNSW index error: {message}"))] + HnswError { message: String }, + + /// Invalid index parameter + #[snafu(display("Invalid index parameter '{parameter}': {reason}"))] + InvalidParameter { parameter: String, reason: String }, + + /// Search failed + #[snafu(display("Search operation failed: {reason}"))] + SearchFailed { reason: String }, + + /// Index is empty + #[snafu(display("Cannot perform operation on empty index"))] + EmptyIndex, + + /// Invalid search limit + #[snafu(display("Invalid search limit: {limit}"))] + InvalidSearchLimit { limit: usize }, +} + +pub type Result = std::result::Result; diff --git a/crates/index/src/flat.rs b/crates/index/src/flat.rs index c0910e3..80b46da 100644 --- a/crates/index/src/flat.rs +++ b/crates/index/src/flat.rs @@ -1,6 +1,6 @@ -use defs::{DbError, DenseVector, DistanceOrderedVector, IndexedVector, PointId, Similarity}; +use defs::{DenseVector, DistanceOrderedVector, IndexedVector, PointId, Similarity}; -use crate::{VectorIndex, distance}; +use crate::{IndexError, Result, VectorIndex, distance}; pub struct FlatIndex { index: Vec, @@ -23,12 +23,12 @@ impl Default for FlatIndex { } impl VectorIndex for FlatIndex { - fn insert(&mut self, vector: IndexedVector) -> Result<(), DbError> { + fn insert(&mut self, vector: IndexedVector) -> Result<()> { self.index.push(vector); Ok(()) } - fn delete(&mut self, point_id: PointId) -> Result { + fn delete(&mut self, point_id: PointId) -> Result { if let Some(pos) = self.index.iter().position(|vector| vector.id == point_id) { self.index.remove(pos); Ok(true) @@ -42,7 +42,11 @@ impl VectorIndex for FlatIndex { query_vector: DenseVector, similarity: Similarity, k: usize, - ) -> Result, DbError> { + ) -> Result> { + // Validate search limit + if k == 0 { + return Err(IndexError::InvalidSearchLimit { limit: k }); + } let scores = self .index .iter() diff --git a/crates/index/src/hnsw/index.rs b/crates/index/src/hnsw/index.rs index c781d7e..d676ddc 100644 --- a/crates/index/src/hnsw/index.rs +++ b/crates/index/src/hnsw/index.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; -use defs::{DbError, DenseVector, Dimension, IndexedVector, PointId, Similarity}; +use defs::{DenseVector, Dimension, IndexedVector, PointId, Similarity}; use uuid::Uuid; use crate::VectorIndex; +use crate::{IndexError, Result}; use super::types::{HnswStats, LevelGenerator, Node, PointIndexation}; use std::cmp::{max, min}; @@ -55,8 +56,8 @@ impl HnswIndex { /// Returns a slice of the stored vector for the given PointId. /// TODO: integrate this cache with an in-memory store backed by RocksDB; on cache miss, /// fetch from storage, populate the cache, and return a stable slice. - pub(super) fn get_vec(&self, id: PointId) -> Result<&Vec, DbError> { - self.cache.get(&id).ok_or(DbError::PointNotFound { id }) + pub(super) fn get_vec(&self, id: PointId) -> Result<&Vec> { + self.cache.get(&id).ok_or(IndexError::PointNotFound { id }) } } @@ -67,14 +68,14 @@ impl VectorIndex for HnswIndex { /// - greedy descend from current entry to l+1 to get a pivot /// - for each level down to 0: ef-construction, diversity pruning, bidirectional connect with caps /// - if l is above current max level, update the entry point - fn insert(&mut self, vector: IndexedVector) -> Result<(), DbError> { + fn insert(&mut self, vector: IndexedVector) -> Result<()> { if self.index.nodes.contains_key(&vector.id) { - return Err(DbError::PointAlreadyExists { id: vector.id }); + return Err(IndexError::PointAlreadyExists { id: vector.id }); } let dim = vector.vector.len(); if dim != self.data_dimension { - return Err(DbError::InvalidDimension { + return Err(IndexError::DimensionMismatch { expected: self.data_dimension, got: dim, }); @@ -152,7 +153,7 @@ impl VectorIndex for HnswIndex { /// - mark node as deleted and clear its cached vector /// - traversals skip deleted nodes /// - if entry point was deleted, move it to the highest-level non-deleted node (or None) - fn delete(&mut self, point_id: PointId) -> Result { + fn delete(&mut self, point_id: PointId) -> Result { if let Some(node) = self.index.nodes.get_mut(&point_id) { if node.deleted { return Ok(false); @@ -177,13 +178,13 @@ impl VectorIndex for HnswIndex { mut query: DenseVector, _similarity: Similarity, k: usize, - ) -> Result, DbError> { + ) -> Result> { if k == 0 { return Ok(Vec::new()); } if query.len() != self.data_dimension { - return Err(DbError::InvalidDimension { + return Err(IndexError::DimensionMismatch { expected: self.data_dimension, got: query.len(), }); @@ -219,7 +220,7 @@ impl VectorIndex for HnswIndex { impl HnswIndex { /// Full rebuild from surviving (non-deleted) vectors currently in-memory. /// Gathers all non-deleted vectors from the cache, clears the graph, and reinserts. - pub fn rebuild_full(&mut self) -> Result<(), DbError> { + pub fn rebuild_full(&mut self) -> Result<()> { let ids: Vec = self .index .nodes diff --git a/crates/index/src/hnsw/search.rs b/crates/index/src/hnsw/search.rs index 5911cc4..9b87783 100644 --- a/crates/index/src/hnsw/search.rs +++ b/crates/index/src/hnsw/search.rs @@ -2,9 +2,9 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; use std::collections::HashSet; -use defs::DbError; use defs::{OrdF32, PointId}; +use crate::Result; use crate::distance; use super::index::HnswIndex; @@ -19,7 +19,7 @@ impl HnswIndex { ep: PointId, level: usize, query: &[f32], - ) -> Result { + ) -> Result { let mut current = ep; loop { let cur_vec = self.get_vec(current)?; @@ -72,7 +72,7 @@ impl HnswIndex { level: usize, query: &[f32], ef_construction: usize, - ) -> Result, DbError> { + ) -> Result> { let mut visited: HashSet = HashSet::new(); let mut candidates: BinaryHeap<(Reverse, PointId)> = BinaryHeap::new(); @@ -155,7 +155,7 @@ impl HnswIndex { &self, candidates: &[(PointId, f32)], m: usize, - ) -> Result, DbError> { + ) -> Result> { if candidates.is_empty() || m == 0 { return Ok(Vec::new()); } @@ -192,7 +192,7 @@ impl HnswIndex { neighbors: &[PointId], level: usize, m: usize, - ) -> Result<(), DbError> { + ) -> Result<()> { self.merge_and_prune(p, level, neighbors, m)?; for &n in neighbors { @@ -244,7 +244,7 @@ impl HnswIndex { level: usize, to_add: &[PointId], cap: usize, - ) -> Result<(), DbError> { + ) -> Result<()> { self.ensure_level(center, level); let mut merged: Vec = { diff --git a/crates/index/src/kd_tree/index.rs b/crates/index/src/kd_tree/index.rs index a188f01..bf4ed7a 100644 --- a/crates/index/src/kd_tree/index.rs +++ b/crates/index/src/kd_tree/index.rs @@ -1,7 +1,7 @@ // use super::helpers::{collect_active_vectors, is_unbalanced, should_rebuild_global}; use super::types::{KDTreeNode, Neighbor}; -use crate::{VectorIndex, distance}; -use defs::{DbError, DenseVector, IndexedVector, OrdF32, PointId, Similarity}; +use crate::{IndexError, Result, VectorIndex, distance}; +use defs::{DenseVector, IndexedVector, OrdF32, PointId, Similarity}; use std::{ cmp::Ordering, collections::{BinaryHeap, HashSet}, @@ -32,9 +32,9 @@ impl KDTree { } // Builds the vector index from provided vectors, there should atleast be single vector for dim calculation - pub fn build(mut vectors: Vec) -> Result { + pub fn build(mut vectors: Vec) -> Result { if vectors.is_empty() { - Err(DbError::IndexInitError) + Err(IndexError::NotInitialized) } else { let dim = vectors[0].vector.len(); @@ -407,12 +407,12 @@ impl KDTree { } impl VectorIndex for KDTree { - fn insert(&mut self, vector: IndexedVector) -> Result<(), DbError> { + fn insert(&mut self, vector: IndexedVector) -> Result<()> { self.insert_point(vector); Ok(()) } - fn delete(&mut self, point_id: PointId) -> Result { + fn delete(&mut self, point_id: PointId) -> Result { Ok(self.delete_point(&point_id)) } @@ -421,9 +421,11 @@ impl VectorIndex for KDTree { query_vector: DenseVector, similarity: Similarity, k: usize, - ) -> Result, DbError> { + ) -> Result> { if matches!(similarity, Similarity::Cosine | Similarity::Hamming) { - return Err(DbError::UnsupportedSimilarity); + return Err(IndexError::UnsupportedSimilarity { + metric: format!("{:?}", similarity), + }); } let results = self.search_top_k(query_vector, k, similarity); diff --git a/crates/index/src/kd_tree/tests.rs b/crates/index/src/kd_tree/tests.rs index faefae5..d48b308 100644 --- a/crates/index/src/kd_tree/tests.rs +++ b/crates/index/src/kd_tree/tests.rs @@ -1,8 +1,9 @@ use super::index::KDTree; +use crate::IndexError; use crate::VectorIndex; use crate::distance; use crate::flat::FlatIndex; -use defs::{DbError, IndexedVector, Similarity}; +use defs::{IndexedVector, Similarity}; use std::collections::HashSet; use uuid::Uuid; @@ -222,7 +223,10 @@ fn test_search_unsupported_similarity_cosine() { let tree = KDTree::build(vectors).unwrap(); let result = tree.search(vec![1.0, 2.0], Similarity::Cosine, 1); - assert!(matches!(result, Err(DbError::UnsupportedSimilarity))); + assert!(matches!( + result, + Err(IndexError::UnsupportedSimilarity { .. }) + )); } #[test] @@ -231,7 +235,10 @@ fn test_search_unsupported_similarity_hamming() { let tree = KDTree::build(vectors).unwrap(); let result = tree.search(vec![1.0, 2.0], Similarity::Hamming, 1); - assert!(matches!(result, Err(DbError::UnsupportedSimilarity))); + assert!(matches!( + result, + Err(IndexError::UnsupportedSimilarity { .. }) + )); } #[test] diff --git a/crates/index/src/lib.rs b/crates/index/src/lib.rs index 4e59ced..3725573 100644 --- a/crates/index/src/lib.rs +++ b/crates/index/src/lib.rs @@ -1,23 +1,23 @@ -use defs::{DbError, DenseVector, IndexedVector, PointId, Similarity}; +use defs::{DenseVector, IndexedVector, PointId, Similarity}; +pub use error::{IndexError, Result}; +pub mod error; pub mod flat; pub mod hnsw; pub mod kd_tree; pub trait VectorIndex: Send + Sync { - fn insert(&mut self, vector: IndexedVector) -> Result<(), DbError>; + fn insert(&mut self, vector: IndexedVector) -> Result<()>; // Returns true if point id existed and is deleted, else returns false - fn delete(&mut self, point_id: PointId) -> Result; + fn delete(&mut self, point_id: PointId) -> Result; fn search( &self, query_vector: DenseVector, similarity: Similarity, k: usize, - ) -> Result, DbError>; // Return a Vec of ids of closest vectors (length max k) - - // fn build() -> Result<(), DbError>; move this to impl for dyn compatibility + ) -> Result>; // Return a Vec of ids of closest vectors (length max k) } /// Distance function to get the distance between two vectors (taken from old version) diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 0dcf527..c3157e7 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -17,6 +17,7 @@ index.workspace = true prost.workspace = true serde.workspace = true serde_json.workspace = true +snafu.workspace = true storage.workspace = true tokio.workspace = true tokio-stream.workspace = true diff --git a/crates/server/src/config.rs b/crates/server/src/config.rs index 6394430..e593afa 100644 --- a/crates/server/src/config.rs +++ b/crates/server/src/config.rs @@ -2,6 +2,7 @@ use api::DbConfig; use defs::Similarity; use dotenv::dotenv; use index::IndexType; +use snafu::prelude::*; use std::env; use std::fs; use std::net::SocketAddr; @@ -22,33 +23,29 @@ pub struct ServerConfig { pub disable_http: bool, } -#[derive(Debug)] +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] pub enum ConfigError { - MissingRequiredEnvVar(String), + #[snafu(display("Missing required environment variable: {var}"))] + MissingRequiredEnvVar { var: String }, + + #[snafu(display("Invalid dimension value"))] InvalidDimension, - InvalidDataPath, - InvalidAddress(String), - IoError(String), -} -impl std::fmt::Display for ConfigError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ConfigError::MissingRequiredEnvVar(var) => { - write!(f, "Missing required environment variable: {}", var) - } - ConfigError::InvalidDimension => write!(f, "Invalid dimension value"), - ConfigError::InvalidDataPath => write!(f, "Invalid data path"), - ConfigError::InvalidAddress(addr) => write!(f, "Invalid address: {}", addr), - ConfigError::IoError(err) => write!(f, "IO error: {}", err), - } - } + #[snafu(display("Invalid data path: {source}"))] + InvalidDataPath { source: std::io::Error }, + + #[snafu(display("IO error: {source}"))] + IoError { source: std::io::Error }, + + #[snafu(display("Invalid address: {addr}"))] + InvalidAddress { addr: String }, } -impl std::error::Error for ConfigError {} +pub type Result = std::result::Result; impl ServerConfig { - pub fn load_config() -> Result { + pub fn load_config() -> Result { dotenv().ok(); // HTTP server configuration @@ -71,9 +68,12 @@ impl ServerConfig { }) .unwrap_or_else(|_| DEFAULT_HTTP_PORT.to_string()); - let http_addr: SocketAddr = format!("{}:{}", http_host, http_port) - .parse() - .map_err(|_| ConfigError::InvalidAddress(format!("{}:{}", http_host, http_port)))?; + let http_addr: SocketAddr = + format!("{}:{}", http_host, http_port) + .parse() + .map_err(|_| ConfigError::InvalidAddress { + addr: format!("{}:{}", http_host, http_port), + })?; // gRPC server configuration let grpc_host = env::var("GRPC_HOST") @@ -95,13 +95,18 @@ impl ServerConfig { }) .unwrap_or_else(|_| DEFAULT_GRPC_PORT.to_string()); - let grpc_addr: SocketAddr = format!("{}:{}", grpc_host, grpc_port) - .parse() - .map_err(|_| ConfigError::InvalidAddress(format!("{}:{}", grpc_host, grpc_port)))?; + let grpc_addr: SocketAddr = + format!("{}:{}", grpc_host, grpc_port) + .parse() + .map_err(|_| ConfigError::InvalidAddress { + addr: format!("{}:{}", grpc_host, grpc_port), + })?; // gRPC root password (required) - let grpc_root_password = env::var("GRPC_ROOT_PASSWORD") - .map_err(|_| ConfigError::MissingRequiredEnvVar("GRPC_ROOT_PASSWORD".to_string()))?; + let grpc_root_password = + env::var("GRPC_ROOT_PASSWORD").map_err(|_| ConfigError::MissingRequiredEnvVar { + var: "GRPC_ROOT_PASSWORD".to_string(), + })?; // Storage type let storage_type_str = env::var("STORAGE_TYPE") @@ -136,18 +141,20 @@ impl ServerConfig { // Dimension (required) let dimension: usize = env::var("DIMENSION") - .map_err(|_| ConfigError::MissingRequiredEnvVar("DIMENSION".to_string()))? + .map_err(|_| ConfigError::MissingRequiredEnvVar { + var: "DIMENSION".to_string(), + })? .parse() .map_err(|_| ConfigError::InvalidDimension)?; // Data path let data_path: PathBuf = if let Ok(data_path_str) = env::var("DATA_PATH") { let path = PathBuf::from(data_path_str); - fs::create_dir_all(&path).map_err(|_| ConfigError::InvalidDataPath)?; + fs::create_dir_all(&path).map_err(|e| ConfigError::InvalidDataPath { source: e })?; path } else { let tempbuf = env::temp_dir().join("vectordb"); - fs::create_dir_all(&tempbuf).map_err(|e| ConfigError::IoError(e.to_string()))?; + fs::create_dir_all(&tempbuf).map_err(|e| ConfigError::IoError { source: e })?; event!( Level::WARN, "DATA_PATH not specified, using temporary directory: {:?}", diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index c786373..b04c0ae 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -10,5 +10,6 @@ license.workspace = true bincode.workspace = true defs.workspace = true rocksdb.workspace = true +snafu.workspace = true tempfile.workspace = true uuid.workspace = true diff --git a/crates/storage/src/error.rs b/crates/storage/src/error.rs new file mode 100644 index 0000000..ba1772b --- /dev/null +++ b/crates/storage/src/error.rs @@ -0,0 +1,32 @@ +use defs::PointId; +use snafu::prelude::*; + +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum StorageError { + #[snafu(display("Failed to open RocksDB at path '{path}': {source}"))] + RocksDbOpen { + path: String, + source: rocksdb::Error, + }, + + #[snafu(display("Failed to read point {id} from storage: {source}"))] + RocksDbRead { id: PointId, source: rocksdb::Error }, + + #[snafu(display("Failed to write point {id} to storage: {source}"))] + RocksDbWrite { id: PointId, source: rocksdb::Error }, + + #[snafu(display("Failed to delete point {id} from storage: {source}"))] + RocksDbDelete { id: PointId, source: rocksdb::Error }, + + #[snafu(display("Failed to iterate over storage: {source}"))] + RocksDbIteration { source: rocksdb::Error }, + + #[snafu(display("Failed to serialize point {id}: {source}"))] + Serialization { id: PointId, source: bincode::Error }, + + #[snafu(display("Failed to deserialize point {id}: {source}"))] + Deserialization { id: PointId, source: bincode::Error }, +} + +pub type Result = std::result::Result; diff --git a/crates/storage/src/in_memory.rs b/crates/storage/src/in_memory.rs index 5190082..1f3fd36 100644 --- a/crates/storage/src/in_memory.rs +++ b/crates/storage/src/in_memory.rs @@ -1,5 +1,6 @@ +use crate::error::StorageError; use crate::{StorageEngine, VectorPage}; -use defs::{DbError, DenseVector, Payload, PointId}; +use defs::{DenseVector, Payload, PointId}; pub struct MemoryStorage { // define here how MemoryStorage will be defined @@ -23,22 +24,26 @@ impl StorageEngine for MemoryStorage { _id: PointId, _vector: Option, _payload: Option, - ) -> Result<(), DbError> { + ) -> Result<(), StorageError> { Ok(()) } - fn contains_point(&self, _id: PointId) -> Result { + fn contains_point(&self, _id: PointId) -> Result { Ok(true) } - fn delete_point(&self, _id: PointId) -> Result<(), DbError> { + fn delete_point(&self, _id: PointId) -> Result<(), StorageError> { Ok(()) } - fn get_payload(&self, _id: PointId) -> Result, DbError> { + fn get_payload(&self, _id: PointId) -> Result, StorageError> { Ok(None) } - fn get_vector(&self, _id: PointId) -> Result, DbError> { + fn get_vector(&self, _id: PointId) -> Result, StorageError> { Ok(None) } - fn list_vectors(&self, _offset: PointId, _limit: usize) -> Result, DbError> { + fn list_vectors( + &self, + _offset: PointId, + _limit: usize, + ) -> Result, StorageError> { Ok(None) } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index f7c067e..8419687 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,4 +1,4 @@ -use defs::{DbError, DenseVector, Payload, PointId}; +use defs::{DenseVector, Payload, PointId}; use std::path::PathBuf; use std::sync::Arc; @@ -6,23 +6,26 @@ use crate::rocks_db::RocksDbStorage; pub type VectorPage = (Vec<(PointId, DenseVector)>, PointId); +pub mod error; +pub mod in_memory; +pub mod rocks_db; + +pub use error::{Result, StorageError}; + pub trait StorageEngine: Send + Sync { fn insert_point( &self, id: PointId, vector: Option, payload: Option, - ) -> Result<(), DbError>; - fn get_vector(&self, id: PointId) -> Result, DbError>; - fn get_payload(&self, id: PointId) -> Result, DbError>; - fn delete_point(&self, id: PointId) -> Result<(), DbError>; - fn contains_point(&self, id: PointId) -> Result; - fn list_vectors(&self, offset: PointId, limit: usize) -> Result, DbError>; + ) -> Result<()>; + fn get_vector(&self, id: PointId) -> Result>; + fn get_payload(&self, id: PointId) -> Result>; + fn delete_point(&self, id: PointId) -> Result<()>; + fn contains_point(&self, id: PointId) -> Result; + fn list_vectors(&self, offset: PointId, limit: usize) -> Result>; } -pub mod in_memory; -pub mod rocks_db; - #[derive(Debug, Clone, Copy)] pub enum StorageType { InMemory, @@ -32,7 +35,7 @@ pub enum StorageType { pub fn create_storage_engine( storage_type: StorageType, path: impl Into, -) -> Result, DbError> { +) -> Result> { match storage_type { StorageType::InMemory => Ok(Arc::new(in_memory::MemoryStorage::new())), StorageType::RocksDb => match RocksDbStorage::new(path) { diff --git a/crates/storage/src/rocks_db.rs b/crates/storage/src/rocks_db.rs index c85f7a7..4c68290 100644 --- a/crates/storage/src/rocks_db.rs +++ b/crates/storage/src/rocks_db.rs @@ -1,9 +1,11 @@ // Rewrite needed +use crate::error::{self, StorageError}; use crate::{StorageEngine, VectorPage}; use bincode::{deserialize, serialize}; -use defs::{DbError, DenseVector, Payload, Point, PointId}; -use rocksdb::{DB, Error, Options}; +use defs::{DenseVector, Payload, Point, PointId}; +use rocksdb::{DB, Options}; +use snafu::ResultExt; use std::path::PathBuf; //TODO: Implement RocksDbStorage with necessary fields and implementations @@ -13,14 +15,9 @@ pub struct RocksDbStorage { pub db: DB, } -pub enum RocksDBStorageError { - RocksDBError(Error), - SerializationError, -} - impl RocksDbStorage { // Creates new db or switches to existing db - pub fn new(path: impl Into) -> Result { + pub fn new(path: impl Into) -> Result { // Initialize a db at the given location let mut options = Options::default(); @@ -31,9 +28,10 @@ impl RocksDbStorage { options.create_if_missing(true); let converted_path = path.into(); + let path_str = converted_path.display().to_string(); let db = DB::open(&options, converted_path.clone()) - .map_err(|e| DbError::StorageError(e.into_string()))?; + .context(error::RocksDbOpenSnafu { path: path_str })?; Ok(RocksDbStorage { path: converted_path, @@ -52,28 +50,30 @@ impl StorageEngine for RocksDbStorage { id: PointId, vector: Option, payload: Option, - ) -> Result<(), DbError> { + ) -> Result<(), StorageError> { let key = id.to_string(); let point = Point { id, vector, payload, }; - let value = serialize(&point).map_err(|e| DbError::SerializationError(e.to_string()))?; - match self.db.put(key.as_bytes(), value.as_slice()) { - Ok(_) => Ok(()), - Err(e) => Err(DbError::StorageError(e.into_string())), - } + let value = serialize(&point).context(error::SerializationSnafu { id })?; + + self.db + .put(key.as_bytes(), value.as_slice()) + .context(error::RocksDbWriteSnafu { id })?; + + Ok(()) } - fn contains_point(&self, id: PointId) -> Result { + fn contains_point(&self, id: PointId) -> Result { // Efficient lookup inspired from https://github.com/facebook/rocksdb/issues/11586#issuecomment-1890429488 let key = id.to_string(); if self.db.key_may_exist(key.clone()) { let key_exist = self .db .get(key) - .map_err(|e| DbError::StorageError(e.into_string()))? + .context(error::RocksDbReadSnafu { id })? .is_some(); Ok(key_exist) } else { @@ -81,48 +81,46 @@ impl StorageEngine for RocksDbStorage { } } - fn delete_point(&self, id: PointId) -> Result<(), DbError> { + fn delete_point(&self, id: PointId) -> Result<(), StorageError> { let key = id.to_string(); self.db .delete(key) - .map_err(|e| DbError::StorageError(e.into_string()))?; + .context(error::RocksDbDeleteSnafu { id })?; Ok(()) } - fn get_payload(&self, id: PointId) -> Result, DbError> { + fn get_payload(&self, id: PointId) -> Result, StorageError> { let key = id.to_string(); - let Some(value_serialized) = self - .db - .get(key) - .map_err(|e| DbError::StorageError(e.into_string()))? + let Some(value_serialized) = self.db.get(key).context(error::RocksDbReadSnafu { id })? else { return Ok(None); // This should not return error but rather give None }; let value = - deserialize::(&value_serialized).map_err(|_| DbError::DeserializationError)?; + deserialize::(&value_serialized).context(error::DeserializationSnafu { id })?; Ok(value.payload) } - fn get_vector(&self, id: PointId) -> Result, DbError> { + fn get_vector(&self, id: PointId) -> Result, StorageError> { let key = id.to_string(); - let Some(value_serialized) = self - .db - .get(key) - .map_err(|e| DbError::StorageError(e.into_string()))? + let Some(value_serialized) = self.db.get(key).context(error::RocksDbReadSnafu { id })? else { return Ok(None); // This should not return error but rather give None }; let value = - deserialize::(&value_serialized).map_err(|_| DbError::DeserializationError)?; + deserialize::(&value_serialized).context(error::DeserializationSnafu { id })?; Ok(value.vector) } - fn list_vectors(&self, offset: PointId, limit: usize) -> Result, DbError> { + fn list_vectors( + &self, + offset: PointId, + limit: usize, + ) -> Result, StorageError> { if limit < 1 { return Ok(None); } @@ -135,8 +133,9 @@ impl StorageEngine for RocksDbStorage { let mut last_id = offset; for item in iter { - let (_, v) = item.map_err(|e| DbError::StorageError(e.into_string()))?; - let point: Point = deserialize(&v).map_err(|_| DbError::DeserializationError)?; + let (_, v) = item.context(error::RocksDbIterationSnafu)?; + let point: Point = + deserialize(&v).context(error::DeserializationSnafu { id: offset })?; if point.id <= offset { continue; @@ -264,4 +263,23 @@ mod tests { assert_eq!(db.get_payload(id).unwrap(), None); } + #[test] + fn test_error_context_preservation() { + // Test that the error chain is preserved + let result = RocksDbStorage::new("/proc/invalid-path"); + + if let Err(err) = result { + // The Display implementation should show both the context and source + let err_string = format!("{}", err); + println!("Full error message: {}", err_string); + + // Should contain our custom context + assert!(err_string.contains("Failed to open RocksDB")); + assert!(err_string.contains("/proc/invalid-path")); + + // The error should also be debuggable + let debug_string = format!("{:?}", err); + println!("Debug format: {}", debug_string); + } + } }