Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ reqwest = { version = "0.12", features = ["json", "blocking", "multipart"] }
rocksdb = "0.21.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.145"
snafu = "0.8.9"
tempfile = "3.23.0"
tokio = { version = "1.47.1", features = ["full"] }
tokio-stream = "0.1.17"
Expand Down
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
[dependencies]
defs.workspace = true
index.workspace = true
snafu.workspace = true
storage.workspace = true
tempfile.workspace = true
uuid.workspace = true
45 changes: 45 additions & 0 deletions crates/api/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use defs::{Dimension, PointId};
use snafu::prelude::*;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum ApiError {
#[snafu(display("Vector dimension mismatch: expected {expected}, got {got}"))]
DimensionMismatch { expected: Dimension, got: Dimension },

#[snafu(display("Failed to acquire lock on vector index"))]
LockError,

#[snafu(display("Storage error: {source}"))]
Storage {
source: storage::error::StorageError,
},

#[snafu(display("Index error: {source}"))]
Index { source: index::error::IndexError },

#[snafu(display("Point {id} not found"))]
PointNotFound { id: PointId },

#[snafu(display("Invalid search limit: {limit}"))]
InvalidSearchLimit { limit: usize },

#[snafu(display("Failed to initialize database: {reason}"))]
InitializationFailed { reason: String },
}

pub type Result<T, E = ApiError> = std::result::Result<T, E>;

// Automatic conversion from StorageError to ApiError
impl From<storage::error::StorageError> for ApiError {
fn from(source: storage::error::StorageError) -> Self {
ApiError::Storage { source }
}
}

// Automatic conversion from IndexError to ApiError
impl From<index::error::IndexError> for ApiError {
fn from(source: index::error::IndexError) -> Self {
ApiError::Index { source }
}
}
72 changes: 57 additions & 15 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use defs::{DbError, Dimension, IndexedVector, Similarity};
use defs::{Dimension, IndexedVector, Similarity};

use defs::{DenseVector, Payload, Point, PointId};
use index::hnsw::HnswIndex;
Expand All @@ -13,6 +13,9 @@ use storage::{StorageEngine, StorageType, VectorPage};

use uuid::Uuid;

pub mod error;
pub use error::{ApiError, Result};

// static NEXT_ID: AtomicU64 = AtomicU64::new(1);

// fn generate_point_id() -> u64 {
Expand Down Expand Up @@ -43,17 +46,20 @@ impl VectorDb {
}

//TODO: Make this an atomic operation
pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result<PointId, DbError> {
pub fn insert(&self, vector: DenseVector, payload: Payload) -> Result<PointId> {
if vector.len() != self.dimension {
return Err(DbError::DimensionMismatch);
return Err(ApiError::DimensionMismatch {
expected: self.dimension,
got: vector.len(),
});
}
// Generate a new point id
let point_id = generate_point_id();
self.storage
.insert_point(point_id, Some(vector.clone()), Some(payload))?;

// Get write lock on the index
let mut index = self.index.write().map_err(|_| DbError::LockError)?;
let mut index = self.index.write().map_err(|_| ApiError::LockError)?;
index.insert(IndexedVector {
vector,
id: point_id,
Expand All @@ -63,16 +69,16 @@ impl VectorDb {
}

//TODO: Make this an atomic operation
pub fn delete(&self, id: PointId) -> Result<bool, DbError> {
pub fn delete(&self, id: PointId) -> Result<bool> {
// Remove from storage
self.storage.delete_point(id)?;
// Remove from index
let mut index = self.index.write().map_err(|_| DbError::LockError)?;
let mut index = self.index.write().map_err(|_| ApiError::LockError)?;
let point_found = index.delete(id)?;
Ok(point_found)
}

pub fn get(&self, id: PointId) -> Result<Option<Point>, DbError> {
pub fn get(&self, id: PointId) -> Result<Option<Point>> {
// Search for the Point with given id in storage
let payload = self.storage.get_payload(id)?;
let vector = self.storage.get_vector(id)?;
Expand All @@ -92,28 +98,42 @@ impl VectorDb {
query: DenseVector,
similarity: Similarity,
limit: usize,
) -> Result<Vec<PointId>, DbError> {
) -> Result<Vec<PointId>> {
// Validate search limit
if limit == 0 {
return Err(ApiError::InvalidSearchLimit { limit });
}

// Validate query dimension
if query.len() != self.dimension {
return Err(ApiError::DimensionMismatch {
expected: self.dimension,
got: query.len(),
});
}

// Use vector index to find similar vectors
let index = self.index.read().map_err(|_| DbError::LockError)?;
let index = self.index.read().map_err(|_| ApiError::LockError)?;

//TODO: Add feat of returning similarity scores in the search
let vectors = index.search(query, similarity, limit)?;

Ok(vectors)
}

pub fn list(&self, offset: PointId, limit: usize) -> Result<Option<VectorPage>, DbError> {
self.storage.list_vectors(offset, limit)
pub fn list(&self, offset: PointId, limit: usize) -> Result<Option<VectorPage>> {
let page = self.storage.list_vectors(offset, limit)?;
Ok(page)
}

// populates the current index with vectors from the storage
pub fn build_index(&self) -> Result<usize, DbError> {
pub fn build_index(&self) -> Result<usize> {
// start from the minimal UUID and fetch in bounded batches and insert
let mut offset = Uuid::nil();
let page_size: usize = 1000;
let mut inserted: usize = 0;

let mut index = self.index.write().map_err(|_| DbError::LockError)?;
let mut index = self.index.write().map_err(|_| ApiError::LockError)?;

while let Some((batch, next_offset)) = self.storage.list_vectors(offset, page_size)? {
if batch.is_empty() || next_offset == offset {
Expand Down Expand Up @@ -141,7 +161,7 @@ pub struct DbConfig {
pub similarity: Similarity,
}

pub fn init_api(config: DbConfig) -> Result<VectorDb, DbError> {
pub fn init_api(config: DbConfig) -> Result<VectorDb> {
// Initialize the storage engine
let storage = match config.storage_type {
StorageType::RocksDb => Arc::new(RocksDbStorage::new(config.data_path)?),
Expand Down Expand Up @@ -230,7 +250,13 @@ mod tests {
// Insert vector of dimension 2 != 3
let res2 = db.insert(v2, payload);
assert!(res2.is_err());
assert_eq!(res2.unwrap_err(), DbError::DimensionMismatch);
match res2.unwrap_err() {
ApiError::DimensionMismatch { expected, got } => {
assert_eq!(expected, 3);
assert_eq!(got, 2);
}
other => panic!("Expected DimensionMismatch, got: {:?}", other),
}
}

#[test]
Expand Down Expand Up @@ -312,6 +338,22 @@ mod tests {
assert_eq!(results.len(), 3);
}

#[test]
fn test_search_zero_limit() {
let (db, _temp_dir) = create_test_db();

let query = vec![1.0, 2.0, 3.0];
let result = db.search(query, Similarity::Cosine, 0);

assert!(result.is_err());
match result.unwrap_err() {
ApiError::InvalidSearchLimit { limit } => {
assert_eq!(limit, 0);
}
other => panic!("Expected InvalidSearchLimit, got: {:?}", other),
}
}

#[test]
fn test_empty_database() {
let (db, _temp_dir) = create_test_db();
Expand Down
26 changes: 0 additions & 26 deletions crates/defs/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,5 @@
use std::io;

use crate::{Dimension, PointId};
#[derive(Debug, PartialEq, Eq)]
pub enum DbError {
ParseError,
StorageError(String),
SerializationError(String),
DeserializationError,
IndexError(String),
LockError,
IndexInitError, //TODO: Change this
UnsupportedSimilarity,
DimensionMismatch,
InvalidDimension { expected: Dimension, got: Dimension },
PointAlreadyExists { id: PointId },
PointNotFound { id: PointId },
}

#[derive(Debug)]
pub enum ServerError {
Bind(io::Error),
Expand All @@ -25,17 +8,8 @@ pub enum ServerError {

#[derive(Debug)]
pub enum AppError {
DbError(DbError),
ServerError(ServerError),
}

impl std::fmt::Display for DbError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}

impl std::error::Error for DbError {}

// Error type for server
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
1 change: 1 addition & 0 deletions crates/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defs.workspace = true
index.workspace = true
prost.workspace = true
prost-types.workspace = true
snafu.workspace = true
storage.workspace = true
tempfile.workspace = true
tokio.workspace = true
Expand Down
Loading
Loading