diff --git a/Cargo.lock b/Cargo.lock index e31fcd41..18fb113e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7000,6 +7000,18 @@ dependencies = [ "syn 2.0.111", ] +[[package]] +name = "serde_dynamo" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "873a97c3f7a67dd042bceb47d056d288424b82d4c66b0a25e1a3b34675620951" +dependencies = [ + "aws-sdk-dynamodb", + "base64 0.21.7", + "serde", + "serde_core", +] + [[package]] name = "serde_json" version = "1.0.145" @@ -7319,6 +7331,7 @@ dependencies = [ "error-stack-trace", "mockall", "serde", + "serde_dynamo", "serde_json", "snafu", "uuid", diff --git a/Cargo.toml b/Cargo.toml index 8bb6b676..b096a7f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ regex = "1.11.1" serde = { version = "1.0.228", features = ["derive", "alloc"] } serde_json = { version = "1.0.145", features = ["raw_value"]} serde_yaml = "0.9" +serde_dynamo = { version = "4.3.0", features = ["aws-sdk-dynamodb+1"] } snafu = { version = "0.8.5", features = ["futures"] } tikv-jemallocator = { version = "0.6.0" } strum = { version = "0.27.2", features = ["derive"] } diff --git a/crates/executor/src/query_task_result.rs b/crates/executor/src/query_task_result.rs index d4d5b502..6dc724b8 100644 --- a/crates/executor/src/query_task_result.rs +++ b/crates/executor/src/query_task_result.rs @@ -5,6 +5,7 @@ use super::models::QueryResult; use super::query_types::ExecutionStatus; use super::snowflake_error::SnowflakeError; use snafu::ResultExt; +#[cfg(feature = "state-store-query")] use state_store::QueryMetric; use tokio::task::JoinError; use uuid::Uuid; diff --git a/crates/state-store/Cargo.toml b/crates/state-store/Cargo.toml index c779b304..65f5a7db 100644 --- a/crates/state-store/Cargo.toml +++ b/crates/state-store/Cargo.toml @@ -14,6 +14,7 @@ aws-credential-types = { workspace = true } aws-sdk-dynamodb = { workspace = true } serde = { workspace = true } serde_json = {workspace = true} +serde_dynamo = { workspace = true } snafu = { workspace = true } chrono = { workspace = true } mockall = { workspace = true } diff --git a/crates/state-store/src/error.rs b/crates/state-store/src/error.rs index 43b4baae..8abb7ac4 100644 --- a/crates/state-store/src/error.rs +++ b/crates/state-store/src/error.rs @@ -4,6 +4,7 @@ use aws_sdk_dynamodb::operation::delete_item::DeleteItemError; use aws_sdk_dynamodb::operation::get_item::GetItemError; use aws_sdk_dynamodb::operation::put_item::PutItemError; use aws_sdk_dynamodb::operation::query::QueryError; +use serde_dynamo::Error as SerdeDynamoError; use snafu::{Location, Snafu}; pub type Result = std::result::Result; @@ -57,6 +58,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Failed to serialize DynamoDB item: {error}"))] + FailedToSerializeDynamo { + #[snafu(source)] + error: SerdeDynamoError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to deserialize DynamoDB item: {error}"))] + FailedToDeserializeDynamo { + #[snafu(source)] + error: SerdeDynamoError, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("item not found"))] NotFound, #[snafu(display("data attribute missing from DynamoDB item"))] diff --git a/crates/state-store/src/state_store_dynamo.rs b/crates/state-store/src/state_store_dynamo.rs index 7b24d5e4..7899878c 100644 --- a/crates/state-store/src/state_store_dynamo.rs +++ b/crates/state-store/src/state_store_dynamo.rs @@ -3,19 +3,20 @@ use crate::config::DynamoDbConfig; use crate::error::Result; use crate::error::{ DynamoDbDeleteItemSnafu, DynamoDbGetItemSnafu, DynamoDbPutItemSnafu, DynamoDbQueryOutputSnafu, - Error, FailedToParseJsonSnafu, + Error, FailedToDeserializeDynamoSnafu, FailedToSerializeDynamoSnafu, }; use crate::models::{Query, SessionRecord}; use aws_sdk_dynamodb::{Client, types::AttributeValue}; use chrono::{DateTime, Utc}; +use serde::Serialize; use serde::de::DeserializeOwned; +use serde_dynamo::{from_item, to_item}; use snafu::ResultExt; use std::collections::HashMap; const PK: &str = "PK"; const SK: &str = "SK"; const ENTITY: &str = "Entity"; -const DATA: &str = "Data"; const QUERY_ID: &str = "query_id"; const REQUEST_ID: &str = "request_id"; const SESSION_ID: &str = "session_id"; @@ -145,10 +146,7 @@ impl StateStore for DynamoDbStateStore { item.insert(PK.to_string(), AttributeValue::S(key.clone())); item.insert(SK.to_string(), AttributeValue::S(key)); item.insert(ENTITY.to_string(), AttributeValue::S(session.entity())); - item.insert( - DATA.to_string(), - AttributeValue::S(serde_json::to_string(&session).context(FailedToParseJsonSnafu)?), - ); + item.extend(model_attributes(&session)?); if let Some(ttl) = session.ttl_seconds { item.insert("ttl".to_string(), AttributeValue::N(ttl.to_string())); @@ -180,7 +178,7 @@ impl StateStore for DynamoDbStateStore { .item .ok_or(Error::NotFound)?; - deserialize_data(item) + deserialize_item(item) } /// Delete a session by id. @@ -209,10 +207,7 @@ impl StateStore for DynamoDbStateStore { item.insert(PK.to_string(), AttributeValue::S(pk)); item.insert(SK.to_string(), AttributeValue::S(sk)); item.insert(ENTITY.to_string(), AttributeValue::S(query.entity())); - item.insert( - DATA.to_string(), - AttributeValue::S(serde_json::to_string(&query).context(FailedToParseJsonSnafu)?), - ); + item.extend(model_attributes(query)?); item.insert( QUERY_ID.to_string(), AttributeValue::S(query.query_id.to_string()), @@ -241,12 +236,12 @@ impl StateStore for DynamoDbStateStore { async fn get_query(&self, query_id: &str) -> Result { let item = self.query_item_by_query_id(query_id).await?; - deserialize_data(item) + deserialize_item(item) } async fn get_query_by_request_id(&self, request_id: &str) -> Result { let item = self.query_item_by_request_id(request_id).await?; - deserialize_data(item) + deserialize_item(item) } async fn get_queries_by_session_id(&self, session_id: &str) -> Result> { @@ -274,13 +269,16 @@ impl StateStore for DynamoDbStateStore { } } -fn deserialize_data(mut item: HashMap) -> Result { - let data = item - .remove(DATA) - .and_then(|attr| attr.as_s().ok().map(std::string::ToString::to_string)) - .ok_or(Error::MissingData)?; +fn model_attributes(value: &T) -> Result> { + to_item(value).context(FailedToSerializeDynamoSnafu) +} - serde_json::from_str(&data).context(FailedToParseJsonSnafu) +fn deserialize_item(mut item: HashMap) -> Result { + item.remove(PK); + item.remove(SK); + item.remove(ENTITY); + item.remove("ttl"); + from_item(item).context(FailedToDeserializeDynamoSnafu) } fn deserialize_items( @@ -288,7 +286,7 @@ fn deserialize_items( ) -> Result> { items .into_iter() - .map(deserialize_data) + .map(deserialize_item) .collect::>>() }