diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e3402dfa97..e66dc890d1 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, Unbound use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; -use toml::Table as TomlTable; -use crate::engine::{EngineRunner, run_slt_with_runner}; +use crate::engine::{CatalogConfig, EngineRunner, run_slt_with_runner}; use crate::error::Result; pub struct DataFusionEngine { @@ -59,12 +58,15 @@ impl EngineRunner for DataFusionEngine { } impl DataFusionEngine { - pub async fn new(config: TomlTable) -> Result { + pub async fn new(catalog_config: Option) -> Result { let session_config = SessionConfig::new() .with_target_partitions(4) .with_information_schema(true); let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); + ctx.register_catalog( + "default", + Self::create_catalog(catalog_config.as_ref()).await?, + ); Ok(Self { test_data_path: PathBuf::from("testdata"), @@ -72,9 +74,11 @@ impl DataFusionEngine { }) } - async fn create_catalog(_: &TomlTable) -> anyhow::Result> { - // TODO: support dynamic catalog configuration - // See: https://github.com/apache/iceberg-rust/issues/1780 + async fn create_catalog( + _catalog_config: Option<&CatalogConfig>, + ) -> anyhow::Result> { + // TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader + // See: https://github.com/apache/iceberg-rust/issues/1780 let catalog = MemoryCatalogBuilder::default() .load( "memory", diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 724359fbe5..9b7c4b0063 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -17,29 +17,45 @@ mod datafusion; +use std::collections::HashMap; use std::path::Path; use anyhow::anyhow; +use serde::Deserialize; use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file}; -use toml::Table as TomlTable; use crate::engine::datafusion::DataFusionEngine; use crate::error::{Error, Result}; -const TYPE_DATAFUSION: &str = "datafusion"; +/// Configuration for the catalog used by an engine +#[derive(Debug, Clone, Deserialize)] +pub struct CatalogConfig { + /// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql" + #[serde(rename = "type")] + pub catalog_type: String, + /// Catalog properties passed to the catalog loader + #[serde(default)] + pub props: HashMap, +} + +/// Engine configuration as a tagged enum +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum EngineConfig { + Datafusion { + #[serde(default)] + catalog: Option, + }, +} #[async_trait::async_trait] pub trait EngineRunner: Send { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } -pub async fn load_engine_runner( - engine_type: &str, - cfg: TomlTable, -) -> Result> { - match engine_type { - TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)), - _ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()), +pub async fn load_engine_runner(config: EngineConfig) -> Result> { + match config { + EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)), } } @@ -65,29 +81,63 @@ where #[cfg(test)] mod tests { - use crate::engine::{TYPE_DATAFUSION, load_engine_runner}; + use crate::engine::{CatalogConfig, EngineConfig, load_engine_runner}; - #[tokio::test] - async fn test_engine_invalid_type() { + #[test] + fn test_deserialize_engine_config() { + let input = r#"type = "datafusion""#; + + let config: EngineConfig = toml::from_str(input).unwrap(); + assert!(matches!(config, EngineConfig::Datafusion { catalog: None })); + } + + #[test] + fn test_deserialize_engine_config_with_catalog() { + let input = r#" + type = "datafusion" + + [catalog] + type = "rest" + + [catalog.props] + uri = "http://localhost:8181" + "#; + + let config: EngineConfig = toml::from_str(input).unwrap(); + match config { + EngineConfig::Datafusion { catalog: Some(cat) } => { + assert_eq!(cat.catalog_type, "rest"); + assert_eq!( + cat.props.get("uri"), + Some(&"http://localhost:8181".to_string()) + ); + } + _ => panic!("Expected Datafusion with catalog"), + } + } + + #[test] + fn test_deserialize_catalog_config() { let input = r#" - [engines] - random = { type = "random_engine", url = "http://localhost:8181" } + type = "memory" + + [props] + warehouse = "file:///tmp/warehouse" "#; - let tbl = toml::from_str(input).unwrap(); - let result = load_engine_runner("random_engine", tbl).await; - assert!(result.is_err()); + let config: CatalogConfig = toml::from_str(input).unwrap(); + assert_eq!(config.catalog_type, "memory"); + assert_eq!( + config.props.get("warehouse"), + Some(&"file:///tmp/warehouse".to_string()) + ); } #[tokio::test] async fn test_load_datafusion() { - let input = r#" - [engines] - df = { type = "datafusion" } - "#; - let tbl = toml::from_str(input).unwrap(); - let result = load_engine_runner(TYPE_DATAFUSION, tbl).await; + let config = EngineConfig::Datafusion { catalog: None }; + let result = load_engine_runner(config).await; assert!(result.is_ok()); } } diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 7c13ad4d12..25728a2968 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -21,10 +21,18 @@ use std::path::{Path, PathBuf}; use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; -use toml::{Table as TomlTable, Value}; use tracing::info; -use crate::engine::{EngineRunner, load_engine_runner}; +use crate::engine::{EngineConfig, EngineRunner, load_engine_runner}; + +/// Raw configuration parsed from the schedule TOML file +#[derive(Debug, Clone, Deserialize)] +pub struct ScheduleConfig { + /// Engine name to engine configuration + pub engines: HashMap, + /// List of test steps to run + pub steps: Vec, +} pub struct Schedule { /// Engine names to engine instances @@ -59,15 +67,27 @@ impl Schedule { pub async fn from_file>(path: P) -> anyhow::Result { let path_str = path.as_ref().to_string_lossy().to_string(); let content = read_to_string(path)?; - let toml_value = content.parse::()?; - let toml_table = toml_value - .as_table() - .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?; - let engines = Schedule::parse_engines(toml_table).await?; - let steps = Schedule::parse_steps(toml_table)?; + let config: ScheduleConfig = toml::from_str(&content) + .with_context(|| format!("Failed to parse schedule file: {path_str}"))?; - Ok(Self::new(engines, steps, path_str)) + let engines = Self::instantiate_engines(config.engines).await?; + + Ok(Self::new(engines, config.steps, path_str)) + } + + /// Instantiate engine runners from their configurations + async fn instantiate_engines( + configs: HashMap, + ) -> anyhow::Result>> { + let mut engines = HashMap::new(); + + for (name, config) in configs { + let engine = load_engine_runner(config).await?; + engines.insert(name, engine); + } + + Ok(engines) } pub async fn run(mut self) -> anyhow::Result<()> { @@ -105,103 +125,131 @@ impl Schedule { } Ok(()) } +} - async fn parse_engines( - table: &TomlTable, - ) -> anyhow::Result>> { - let engines_tbl = table - .get("engines") - .with_context(|| "Schedule file must have an 'engines' table")? - .as_table() - .ok_or_else(|| anyhow!("'engines' must be a table"))?; - - let mut engines = HashMap::new(); - - for (name, engine_val) in engines_tbl { - let cfg_tbl = engine_val - .as_table() - .ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))? - .clone(); - - let engine_type = cfg_tbl - .get("type") - .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))? - .as_str() - .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?; - - let engine = load_engine_runner(engine_type, cfg_tbl.clone()).await?; - - if engines.insert(name.clone(), engine).is_some() { - return Err(anyhow!("Duplicate engine '{name}'")); - } - } +#[cfg(test)] +mod tests { + use crate::engine::EngineConfig; + use crate::schedule::ScheduleConfig; - Ok(engines) - } + #[test] + fn test_deserialize_schedule_config() { + let input = r#" + [engines] + df = { type = "datafusion" } - fn parse_steps(table: &TomlTable) -> anyhow::Result> { - let steps_val = table - .get("steps") - .with_context(|| "Schedule file must have a 'steps' array")?; + [[steps]] + engine = "df" + slt = "test.slt" + "#; - let steps: Vec = steps_val - .clone() - .try_into() - .with_context(|| "Failed to deserialize steps")?; + let config: ScheduleConfig = toml::from_str(input).unwrap(); - Ok(steps) + assert_eq!(config.engines.len(), 1); + assert!(config.engines.contains_key("df")); + assert!(matches!(config.engines["df"], EngineConfig::Datafusion { + catalog: None + })); + assert_eq!(config.steps.len(), 1); + assert_eq!(config.steps[0].engine, "df"); + assert_eq!(config.steps[0].slt, "test.slt"); } -} - -#[cfg(test)] -mod tests { - use toml::Table as TomlTable; - - use crate::schedule::Schedule; #[test] - fn test_parse_steps() { + fn test_deserialize_multiple_steps() { let input = r#" + [engines] + datafusion = { type = "datafusion" } + [[steps]] engine = "datafusion" slt = "test.slt" [[steps]] - engine = "spark" + engine = "datafusion" slt = "test2.slt" "#; - let tbl: TomlTable = toml::from_str(input).unwrap(); - let steps = Schedule::parse_steps(&tbl).unwrap(); + let config: ScheduleConfig = toml::from_str(input).unwrap(); - assert_eq!(steps.len(), 2); - assert_eq!(steps[0].engine, "datafusion"); - assert_eq!(steps[0].slt, "test.slt"); - assert_eq!(steps[1].engine, "spark"); - assert_eq!(steps[1].slt, "test2.slt"); + assert_eq!(config.steps.len(), 2); + assert_eq!(config.steps[0].engine, "datafusion"); + assert_eq!(config.steps[0].slt, "test.slt"); + assert_eq!(config.steps[1].engine, "datafusion"); + assert_eq!(config.steps[1].slt, "test2.slt"); } #[test] - fn test_parse_steps_empty() { + fn test_deserialize_with_catalog_config() { let input = r#" + [engines.df] + type = "datafusion" + + [engines.df.catalog] + type = "rest" + + [engines.df.catalog.props] + uri = "http://localhost:8181" + [[steps]] + engine = "df" + slt = "test.slt" "#; - let tbl: TomlTable = toml::from_str(input).unwrap(); - let steps = Schedule::parse_steps(&tbl); + let config: ScheduleConfig = toml::from_str(input).unwrap(); - assert!(steps.is_err()); + match &config.engines["df"] { + EngineConfig::Datafusion { catalog: Some(cat) } => { + assert_eq!(cat.catalog_type, "rest"); + assert_eq!( + cat.props.get("uri"), + Some(&"http://localhost:8181".to_string()) + ); + } + _ => panic!("Expected Datafusion with catalog config"), + } } - #[tokio::test] - async fn test_parse_engines_invalid_table() { - let toml_content = r#" - engines = "not_a_table" + #[test] + fn test_deserialize_missing_engine_type() { + let input = r#" + [engines] + df = { } + + [[steps]] + engine = "df" + slt = "test.slt" "#; - let table: TomlTable = toml::from_str(toml_content).unwrap(); - let result = Schedule::parse_engines(&table).await; + let result: Result = toml::from_str(input); + assert!(result.is_err()); + } + + #[test] + fn test_deserialize_invalid_engine_type() { + let input = r#" + [engines] + df = { type = "unknown_engine" } + + [[steps]] + engine = "df" + slt = "test.slt" + "#; + + let result: Result = toml::from_str(input); + assert!(result.is_err()); + } + + #[test] + fn test_deserialize_missing_step_fields() { + let input = r#" + [engines] + df = { type = "datafusion" } + + [[steps]] + "#; + let result: Result = toml::from_str(input); assert!(result.is_err()); } }