From e25db98b3c1918dd87195ede8b7dd45393741d95 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Mon, 22 Sep 2025 19:43:40 +0200 Subject: [PATCH 01/13] test: ephemeral columns are broken after #244 --- tests/it/rbwnat_smoke.rs | 81 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index 0868efa1..cd9be532 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -1412,3 +1412,84 @@ async fn interval() { } ); } + +// See https://clickhouse.com/docs/sql-reference/statements/create/table#ephemeral +// +// Ignored cause: +// +// #### All struct fields: +// - id +// - unhexed +// #### All schema columns: +// - id: UInt64 +// - hexed: FixedString(4) +#[tokio::test] +#[ignore] +async fn ephemeral_columns() { + #[derive(Clone, Debug, Row, Serialize, PartialEq)] + struct DataInsert { + id: u64, + unhexed: String, + } + + #[derive(Clone, Debug, Row, Deserialize, PartialEq)] + struct DataSelect { + id: u64, + hexed: [u8; 4], + } + + let client = get_client(); + client + .query( + " + CREATE OR REPLACE TABLE test + ( + id UInt64, + unhexed String EPHEMERAL, + hexed FixedString(4) DEFAULT unhex(unhexed) + ) + ENGINE = MergeTree + ORDER BY id + ", + ) + .execute() + .await + .unwrap(); + + let rows_to_insert = vec![ + DataInsert { + id: 1, + unhexed: "41424344".to_string(), // "ABCD" in hex + }, + DataInsert { + id: 2, + unhexed: "31323334".to_string(), // "1234" in hex + }, + ]; + + let mut insert = client.insert::("test").await.unwrap(); + for row in rows_to_insert.into_iter() { + insert.write(&row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows = client + .query("SELECT ?fields FROM test ORDER BY () ASC") + .fetch_all::() + .await + .unwrap(); + + assert_eq!( + rows, + vec![ + DataSelect { + id: 1, + hexed: *b"ABCD", + }, + DataSelect { + id: 2, + hexed: *b"1234", + }, + ] + ); +} From 1cb183448509303fb8cec78532ce013acb968314 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Mon, 22 Sep 2025 20:49:46 +0200 Subject: [PATCH 02/13] test: add materialized and alias columns tests --- tests/it/rbwnat_smoke.rs | 219 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 203 insertions(+), 16 deletions(-) diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index cd9be532..fb198fa3 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -1,6 +1,6 @@ use crate::decimals::*; use crate::geo_types::{LineString, MultiLineString, MultiPolygon, Point, Polygon, Ring}; -use crate::{create_simple_table, get_client, insert_and_select, SimpleRow}; +use crate::{create_simple_table, execute_statements, get_client, insert_and_select, SimpleRow}; use clickhouse::sql::Identifier; use clickhouse::Row; use fxhash::FxHashMap; @@ -1417,41 +1417,45 @@ async fn interval() { // // Ignored cause: // +// While processing struct DataInsert: database schema has no column named hexed. // #### All struct fields: // - id -// - unhexed +// - hexed // #### All schema columns: +// - raw: FixedString(3) // - id: UInt64 -// - hexed: FixedString(4) #[tokio::test] #[ignore] async fn ephemeral_columns() { + let table_name = "test_ephemeral_columns"; + #[derive(Clone, Debug, Row, Serialize, PartialEq)] struct DataInsert { id: u64, - unhexed: String, + hexed: String, } #[derive(Clone, Debug, Row, Deserialize, PartialEq)] struct DataSelect { id: u64, - hexed: [u8; 4], + raw: [u8; 3], } let client = get_client(); client .query( " - CREATE OR REPLACE TABLE test + CREATE OR REPLACE TABLE ? ( - id UInt64, - unhexed String EPHEMERAL, - hexed FixedString(4) DEFAULT unhex(unhexed) + id UInt64, + hexed String EPHEMERAL, + raw FixedString(3) DEFAULT unhex(hexed) ) ENGINE = MergeTree ORDER BY id ", ) + .bind(Identifier(table_name)) .execute() .await .unwrap(); @@ -1459,22 +1463,23 @@ async fn ephemeral_columns() { let rows_to_insert = vec![ DataInsert { id: 1, - unhexed: "41424344".to_string(), // "ABCD" in hex + hexed: "666F6F".to_string(), // "foo" in hex }, DataInsert { id: 2, - unhexed: "31323334".to_string(), // "1234" in hex + hexed: "626172".to_string(), // "bar" in hex }, ]; - let mut insert = client.insert::("test").await.unwrap(); + let mut insert = client.insert::(table_name).await.unwrap(); for row in rows_to_insert.into_iter() { insert.write(&row).await.unwrap(); } insert.end().await.unwrap(); let rows = client - .query("SELECT ?fields FROM test ORDER BY () ASC") + .query("SELECT ?fields FROM ? ORDER BY () ASC") + .bind(Identifier(table_name)) .fetch_all::() .await .unwrap(); @@ -1484,12 +1489,194 @@ async fn ephemeral_columns() { vec![ DataSelect { id: 1, - hexed: *b"ABCD", + raw: *b"foo", }, DataSelect { id: 2, - hexed: *b"1234", - }, + raw: *b"bar", + } ] ); } + +// See https://clickhouse.com/docs/sql-reference/statements/alter/column#materialize-column +// +// Ignored cause: +// +// While processing struct Data: database schema has 1 column(s), but the struct definition has 2 field(s). +// #### All struct fields: +// - x +// - s +// #### All schema columns: +// - x: Int64 +#[tokio::test] +#[ignore] +async fn materialized_columns() { + let table_name = "test_materialized_columns"; + + #[derive(Clone, Debug, Row, Serialize, Deserialize, PartialEq)] + struct Data { + x: i64, + s: String, + } + + let client = get_client(); + execute_statements( + &client, + &[ + &format!( + " + CREATE OR REPLACE TABLE {table_name} (x Int64) + ENGINE = MergeTree ORDER BY () PARTITION BY () + " + ), + &format!("INSERT INTO {table_name} SELECT * FROM system.numbers LIMIT 5"), + &format!("ALTER TABLE {table_name} ADD COLUMN s String MATERIALIZED toString(x)"), + &format!("ALTER TABLE {table_name} MATERIALIZE COLUMN s"), + ], + ) + .await; + + let rows = client + .query("SELECT ?fields FROM ? ORDER BY x ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows = (0..5) + .map(|x| Data { + x, + s: x.to_string(), + }) + .collect::>(); + assert_eq!(rows, expected_rows); + + let rows_to_insert = vec![ + Data { + x: 5, + s: "5".to_string(), + }, + Data { + x: 6, + s: "6".to_string(), + }, + ]; + + // fails on this insert + let mut insert = client.insert::(table_name).await.unwrap(); + for row in &rows_to_insert { + insert.write(row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows_after_insert = client + .query("SELECT ?fields FROM ? ORDER BY x ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows_after_insert = [&rows[..], &rows_to_insert[..]].concat(); + assert_eq!(rows_after_insert, expected_rows_after_insert); +} + +// See https://clickhouse.com/docs/sql-reference/statements/create/table#alias +#[tokio::test] +async fn alias_columns() { + let table_name = "test_alias_columns"; + + #[derive(Clone, Debug, Row, Deserialize, PartialEq)] + struct Data { + id: u64, + size_bytes: i64, + size: String, + } + + #[derive(Clone, Debug, Row, Serialize, PartialEq)] + struct DataInsert { + id: u64, + size_bytes: i64, + } + + let client = get_client(); + execute_statements( + &client, + &[ + &format!( + " + CREATE OR REPLACE TABLE {table_name} + ( + id UInt64, + size_bytes Int64, + size String ALIAS formatReadableSize(size_bytes) + ) + ENGINE = MergeTree + ORDER BY id; + ", + ), + &format!("INSERT INTO {table_name} VALUES (1, 4678899)"), + ], + ) + .await; + + let rows = client + .query("SELECT ?fields FROM ?") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows = vec![Data { + id: 1, + size_bytes: 4678899, + size: "4.46 MiB".to_string(), + }]; + + assert_eq!(rows, expected_rows); + + let rows_to_insert = vec![ + DataInsert { + id: 2, + size_bytes: 123456, + }, + DataInsert { + id: 3, + size_bytes: 987654321, + }, + ]; + + // this insert fails + let mut insert = client.insert::(table_name).await.unwrap(); + for row in &rows_to_insert { + insert.write(row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows_after_insert = client + .query("SELECT ?fields FROM ? ORDER BY id ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows_after_insert = vec![ + Data { + id: 1, + size_bytes: 4678899, + size: "4.46 MiB".to_string(), + }, + Data { + id: 2, + size_bytes: 123456, + size: "120.56 KiB".to_string(), + }, + Data { + id: 3, + size_bytes: 987654321, + size: "941.90 MiB".to_string(), + }, + ]; + + assert_eq!(rows_after_insert, expected_rows_after_insert); +} From d642475b3edda02835924efd5b66bc857463bb6d Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Mon, 22 Sep 2025 21:14:15 +0200 Subject: [PATCH 03/13] test: this insert does not fail --- tests/it/rbwnat_smoke.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index fb198fa3..8a295921 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -1646,7 +1646,6 @@ async fn alias_columns() { }, ]; - // this insert fails let mut insert = client.insert::(table_name).await.unwrap(); for row in &rows_to_insert { insert.write(row).await.unwrap(); From f93043da499cf85e00501da410f54ba68fc0aec1 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 25 Sep 2025 09:30:52 -0700 Subject: [PATCH 04/13] feat: add `Client::clear_cached_metadata` --- src/insert.rs | 10 +++++++ src/lib.rs | 76 +++++++++++++++++++++++++++++---------------------- 2 files changed, 54 insertions(+), 32 deletions(-) diff --git a/src/insert.rs b/src/insert.rs index 8382b26a..065e18ac 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -34,6 +34,16 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac /// Otherwise, the whole `INSERT` will be aborted. /// /// Rows are being sent progressively to spread network load. +/// +/// # Note: Metadata is Cached +/// If [validation is enabled][Client::with_validation], +/// this helper will query the metadata for the target table to learn the column names and types. +/// +/// To avoid querying this metadata every time, it is cached within the [`Client`]. +/// +/// Any concurrent changes to the table schema may cause insert failures if the metadata +/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`] +/// after any changes to the current database schema. #[must_use] pub struct Insert { state: InsertState, diff --git a/src/lib.rs b/src/lib.rs index 54f1a5c2..118fb89e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -107,14 +107,9 @@ impl Default for Client { /// Cache for [`RowMetadata`] to avoid allocating it for the same struct more than once /// during the application lifecycle. Key: fully qualified table name (e.g. `database.table`). +#[derive(Default)] pub(crate) struct RowMetadataCache(RwLock>>); -impl Default for RowMetadataCache { - fn default() -> Self { - RowMetadataCache(RwLock::new(HashMap::default())) - } -} - impl Client { /// Creates a new client with a specified underlying HTTP client. /// @@ -157,6 +152,10 @@ impl Client { /// ``` pub fn with_database(mut self, database: impl Into) -> Self { self.database = Some(database.into()); + + // If we're looking at a different database, then our cached metadata is invalid. + self.row_metadata_cache = Default::default(); + self } @@ -387,6 +386,18 @@ impl Client { self } + /// Clear table metadata that was previously received and cached. + /// + /// [`Insert`] uses cached metadata when sending data with validation. + /// If the table schema changes, this metadata needs to re-fetched. + /// + /// This method clears the metadata cache, causing future insert queries to re-fetch metadata. + /// + /// This may need to wait to acquire a lock if a query is concurrently writing into the cache. + pub async fn clear_cached_metadata(&mut self) { + self.row_metadata_cache.0.write().await.clear(); + } + /// Used internally to check if the validation mode is enabled, /// as it takes into account the `test-util` feature flag. #[inline] @@ -422,34 +433,35 @@ impl Client { &self, table_name: &str, ) -> Result> { - let read_lock = self.row_metadata_cache.0.read().await; - match read_lock.get(table_name) { - Some(metadata) => Ok(metadata.clone()), - None => { - drop(read_lock); - // TODO: should it be moved to a cold function? - let mut write_lock = self.row_metadata_cache.0.write().await; - let db = match self.database { - Some(ref db) => db, - None => "default", - }; - let mut bytes_cursor = self - .query("SELECT * FROM ? LIMIT 0") - .bind(Identifier(table_name)) - // don't allow to override the client database set in the client instance - // with a `.with_option("database", "some_other_db")` call on the app side - .with_option("database", db) - .fetch_bytes("RowBinaryWithNamesAndTypes")?; - let mut buffer = Vec::::new(); - while let Some(chunk) = bytes_cursor.next().await? { - buffer.extend_from_slice(&chunk); - } - let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?; - let metadata = Arc::new(RowMetadata::new_for_insert::(columns)); - write_lock.insert(table_name.to_string(), metadata.clone()); - Ok(metadata) + { + let read_lock = self.row_metadata_cache.0.read().await; + + if let Some(metadata) = read_lock.get(table_name) { + return Ok(metadata.clone()); } } + + // TODO: should it be moved to a cold function? + let mut write_lock = self.row_metadata_cache.0.write().await; + let db = match self.database { + Some(ref db) => db, + None => "default", + }; + let mut bytes_cursor = self + .query("SELECT * FROM ? LIMIT 0") + .bind(Identifier(table_name)) + // don't allow to override the client database set in the client instance + // with a `.with_option("database", "some_other_db")` call on the app side + .with_option("database", db) + .fetch_bytes("RowBinaryWithNamesAndTypes")?; + let mut buffer = Vec::::new(); + while let Some(chunk) = bytes_cursor.next().await? { + buffer.extend_from_slice(&chunk); + } + let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?; + let metadata = Arc::new(RowMetadata::new_for_insert::(columns)); + write_lock.insert(table_name.to_string(), metadata.clone()); + Ok(metadata) } } From 0ebcdf19b649f03bc333a86a355f81e23f7db5c6 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 25 Sep 2025 09:31:47 -0700 Subject: [PATCH 05/13] feat: add `Client::with_metadata_ttl()` --- src/lib.rs | 23 +++++++++++++++++++++-- src/row_metadata.rs | 7 +++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 118fb89e..8034c984 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ use crate::sql::Identifier; pub use clickhouse_derive::Row; use clickhouse_types::parse_rbwnat_columns_header; +use std::time::Duration; use std::{collections::HashMap, fmt::Display, sync::Arc}; use tokio::sync::RwLock; @@ -62,7 +63,7 @@ pub struct Client { products_info: Vec, validation: bool, row_metadata_cache: Arc, - + metadata_ttl: Option, #[cfg(feature = "test-util")] mocked: bool, } @@ -126,6 +127,7 @@ impl Client { products_info: Vec::default(), validation: true, row_metadata_cache: Arc::new(RowMetadataCache::default()), + metadata_ttl: Some(Duration::from_secs(60 * 60)), // 1 hour #[cfg(feature = "test-util")] mocked: false, } @@ -326,6 +328,17 @@ impl Client { self } + /// Set or clear the time-to-live for cached metadata. + /// + /// Any metadata older than this will be re-fetched. + /// + /// Set to `None` to cache metadata forever. The cache can still be manually cleared with + /// [`Client::clear_cached_metadata()`]. + pub fn with_metadata_ttl(mut self, ttl: impl Into>) -> Self { + self.metadata_ttl = ttl.into(); + self + } + /// Starts a new INSERT statement. /// /// # Validation @@ -437,7 +450,13 @@ impl Client { let read_lock = self.row_metadata_cache.0.read().await; if let Some(metadata) = read_lock.get(table_name) { - return Ok(metadata.clone()); + // FIXME: `Option::is_none_or` isn't available until 1.82 + if self + .metadata_ttl + .map_or(true, |ttl| metadata.received_at.elapsed() < ttl) + { + return Ok(metadata.clone()); + } } } diff --git a/src/row_metadata.rs b/src/row_metadata.rs index d27ed103..fd94160c 100644 --- a/src/row_metadata.rs +++ b/src/row_metadata.rs @@ -3,6 +3,7 @@ use crate::row::RowKind; use clickhouse_types::Column; use std::collections::HashMap; use std::fmt::Display; +use std::time::Instant; #[derive(Debug, PartialEq)] pub(crate) enum AccessType { @@ -22,6 +23,7 @@ pub(crate) struct RowMetadata { /// * For selects, it is defined in the same order as in the database schema. /// * For inserts, it is adjusted to the order of fields in the struct definition. pub(crate) columns: Vec, + /// This determines whether we can just use [`crate::rowbinary::de::RowBinarySeqAccess`] /// or a more sophisticated approach with [`crate::rowbinary::de::RowBinaryStructAsMapAccess`] /// to support structs defined with different fields order than in the schema. @@ -30,6 +32,9 @@ pub(crate) struct RowMetadata { /// on the shape of the data. In some cases, there is no noticeable difference, /// in others, it could be up to 2-3x slower. pub(crate) access_type: AccessType, + + /// The time this row metadata was received. Used for enforcing the TTL when cached. + pub(crate) received_at: Instant, } impl RowMetadata { @@ -118,6 +123,7 @@ impl RowMetadata { Self { columns, access_type, + received_at: Instant::now(), } } @@ -166,6 +172,7 @@ impl RowMetadata { Self { columns: result_columns, access_type: AccessType::WithSeqAccess, // ignored + received_at: Instant::now(), } } From 712485ebceabb40dd7affb5d5987d33a57d2d8fd Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 25 Sep 2025 09:33:24 -0700 Subject: [PATCH 06/13] Revert "feat: add `Client::with_metadata_ttl()`" This reverts commit b9d3bc1201f6073fb4ad56938d3c7aea6cec8118. --- src/lib.rs | 23 ++--------------------- src/row_metadata.rs | 7 ------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8034c984..118fb89e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ use crate::sql::Identifier; pub use clickhouse_derive::Row; use clickhouse_types::parse_rbwnat_columns_header; -use std::time::Duration; use std::{collections::HashMap, fmt::Display, sync::Arc}; use tokio::sync::RwLock; @@ -63,7 +62,7 @@ pub struct Client { products_info: Vec, validation: bool, row_metadata_cache: Arc, - metadata_ttl: Option, + #[cfg(feature = "test-util")] mocked: bool, } @@ -127,7 +126,6 @@ impl Client { products_info: Vec::default(), validation: true, row_metadata_cache: Arc::new(RowMetadataCache::default()), - metadata_ttl: Some(Duration::from_secs(60 * 60)), // 1 hour #[cfg(feature = "test-util")] mocked: false, } @@ -328,17 +326,6 @@ impl Client { self } - /// Set or clear the time-to-live for cached metadata. - /// - /// Any metadata older than this will be re-fetched. - /// - /// Set to `None` to cache metadata forever. The cache can still be manually cleared with - /// [`Client::clear_cached_metadata()`]. - pub fn with_metadata_ttl(mut self, ttl: impl Into>) -> Self { - self.metadata_ttl = ttl.into(); - self - } - /// Starts a new INSERT statement. /// /// # Validation @@ -450,13 +437,7 @@ impl Client { let read_lock = self.row_metadata_cache.0.read().await; if let Some(metadata) = read_lock.get(table_name) { - // FIXME: `Option::is_none_or` isn't available until 1.82 - if self - .metadata_ttl - .map_or(true, |ttl| metadata.received_at.elapsed() < ttl) - { - return Ok(metadata.clone()); - } + return Ok(metadata.clone()); } } diff --git a/src/row_metadata.rs b/src/row_metadata.rs index fd94160c..d27ed103 100644 --- a/src/row_metadata.rs +++ b/src/row_metadata.rs @@ -3,7 +3,6 @@ use crate::row::RowKind; use clickhouse_types::Column; use std::collections::HashMap; use std::fmt::Display; -use std::time::Instant; #[derive(Debug, PartialEq)] pub(crate) enum AccessType { @@ -23,7 +22,6 @@ pub(crate) struct RowMetadata { /// * For selects, it is defined in the same order as in the database schema. /// * For inserts, it is adjusted to the order of fields in the struct definition. pub(crate) columns: Vec, - /// This determines whether we can just use [`crate::rowbinary::de::RowBinarySeqAccess`] /// or a more sophisticated approach with [`crate::rowbinary::de::RowBinaryStructAsMapAccess`] /// to support structs defined with different fields order than in the schema. @@ -32,9 +30,6 @@ pub(crate) struct RowMetadata { /// on the shape of the data. In some cases, there is no noticeable difference, /// in others, it could be up to 2-3x slower. pub(crate) access_type: AccessType, - - /// The time this row metadata was received. Used for enforcing the TTL when cached. - pub(crate) received_at: Instant, } impl RowMetadata { @@ -123,7 +118,6 @@ impl RowMetadata { Self { columns, access_type, - received_at: Instant::now(), } } @@ -172,7 +166,6 @@ impl RowMetadata { Self { columns: result_columns, access_type: AccessType::WithSeqAccess, // ignored - received_at: Instant::now(), } } From 6125f96437461480d3fa8215b0109238fb355073 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 25 Sep 2025 11:57:25 -0700 Subject: [PATCH 07/13] feat: add test for `Client::clear_cached_metadata` --- docker-compose.yml | 3 +- src/lib.rs | 9 ++++- tests/it/insert.rs | 95 ++++++++++++++++++++++++++++++++++++++++++++++ tests/it/main.rs | 2 +- 4 files changed, 105 insertions(+), 4 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index cc309127..a7dc8456 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,8 @@ name: clickhouse-rs services: clickhouse: - image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}' + # Note: use of a fully qualified url makes Podman happy without need for further configuration. + image: 'docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}' container_name: 'clickhouse-rs-clickhouse-server' environment: CLICKHOUSE_SKIP_USER_SETUP: 1 diff --git a/src/lib.rs b/src/lib.rs index 118fb89e..3b361a85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -145,6 +145,8 @@ impl Client { /// Specifies a database name. /// + /// + /// /// # Examples /// ``` /// # use clickhouse::Client; @@ -153,7 +155,7 @@ impl Client { pub fn with_database(mut self, database: impl Into) -> Self { self.database = Some(database.into()); - // If we're looking at a different database, then our cached metadata is invalid. + // Assume our cached metadata is invalid. self.row_metadata_cache = Default::default(); self @@ -392,9 +394,12 @@ impl Client { /// If the table schema changes, this metadata needs to re-fetched. /// /// This method clears the metadata cache, causing future insert queries to re-fetch metadata. + /// This applies to all cloned instances of this `Client` as well. /// /// This may need to wait to acquire a lock if a query is concurrently writing into the cache. - pub async fn clear_cached_metadata(&mut self) { + /// + /// Cancel-safe. + pub async fn clear_cached_metadata(&self) { self.row_metadata_cache.0.write().await.clear(); } diff --git a/tests/it/insert.rs b/tests/it/insert.rs index 68e569d4..39026368 100644 --- a/tests/it/insert.rs +++ b/tests/it/insert.rs @@ -1,6 +1,7 @@ use crate::{SimpleRow, create_simple_table, fetch_rows, flush_query_log}; use clickhouse::{Row, sql::Identifier}; use serde::{Deserialize, Serialize}; +use std::panic::AssertUnwindSafe; #[tokio::test] async fn keeps_client_options() { @@ -239,3 +240,97 @@ async fn insert_from_cursor() { ); assert_eq!(cursor.next().await.unwrap(), None); } + +#[tokio::test] +async fn clear_cached_metadata() { + #[derive(clickhouse::Row, serde::Serialize)] + struct Foo { + bar: i32, + baz: String, + } + + #[derive( + clickhouse::Row, + serde::Serialize, + serde::Deserialize, + PartialEq, + Eq, + Debug + )] + struct Foo2 { + bar: i32, + } + + let client = prepare_database!().with_validation(true); + + client + .query("CREATE TABLE foo(bar Int32, baz String) ENGINE = MergeTree PRIMARY KEY(bar)") + .execute() + .await + .unwrap(); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 1, + baz: "Hello, world!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + client + .query("ALTER TABLE foo DROP COLUMN baz") + .execute() + .await + .unwrap(); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .unwrap(); + + dbg!( + insert + .end() + .await + .expect_err("Insert metadata is invalid; this should error!") + ); + + client.clear_cached_metadata().await; + + let write_invalid = AssertUnwindSafe(async { + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .expect_err("`Foo` should no longer be valid for the table"); + }); + + assert_panic_msg!(write_invalid, ["1 columns", "2 fields", "bar", "baz"]); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert.write(&Foo2 { bar: 3 }).await.unwrap(); + + insert.end().await.unwrap(); + + let rows = client + .query("SELECT * FROM foo") + .fetch_all::() + .await + .unwrap(); + + assert_eq!(*rows, [Foo2 { bar: 1 }, Foo2 { bar: 3 }]); +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 4a5c38a3..486e12b5 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -78,7 +78,7 @@ macro_rules! assert_panic_msg { result.unwrap() ); let panic_msg = *result.unwrap_err().downcast::().unwrap(); - for &msg in $msg_parts { + for msg in $msg_parts { assert!( panic_msg.contains(msg), "panic message:\n{panic_msg}\ndid not contain the expected part:\n{msg}" From fbdd340bae73681ca5e742cec5052bd3a5c5d296 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Thu, 25 Sep 2025 13:35:33 -0700 Subject: [PATCH 08/13] refactor: move `cache_row_metadata` test to `tests/it`, rearchitect invalidate on `with_url` --- src/lib.rs | 41 ++++++-------------- tests/it/insert.rs | 94 +++++++++++++++++++++++++++++++++++++++++++++- tests/it/main.rs | 30 +++++++++------ 3 files changed, 123 insertions(+), 42 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3b361a85..9d927fc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,6 +133,9 @@ impl Client { /// Specifies ClickHouse's url. Should point to HTTP endpoint. /// + /// Automatically [clears the metadata cache][Self::clear_cached_metadata] + /// for this instance only. + /// /// # Examples /// ``` /// # use clickhouse::Client; @@ -140,12 +143,17 @@ impl Client { /// ``` pub fn with_url(mut self, url: impl Into) -> Self { self.url = url.into(); + + // Assume our cached metadata is invalid. + self.row_metadata_cache = Default::default(); + self } /// Specifies a database name. /// - /// + /// Automatically [clears the metadata cache][Self::clear_cached_metadata] + /// for this instance only. /// /// # Examples /// ``` @@ -390,11 +398,12 @@ impl Client { /// Clear table metadata that was previously received and cached. /// - /// [`Insert`] uses cached metadata when sending data with validation. + /// [`Insert`][crate::insert::Insert] uses cached metadata when sending data with validation. /// If the table schema changes, this metadata needs to re-fetched. /// /// This method clears the metadata cache, causing future insert queries to re-fetch metadata. - /// This applies to all cloned instances of this `Client` as well. + /// This applies to all cloned instances of this `Client` (using the same URL and database) + /// as well. /// /// This may need to wait to acquire a lock if a query is concurrently writing into the cache. /// @@ -660,32 +669,6 @@ mod client_tests { ); } - #[tokio::test] - async fn cache_row_metadata() { - let client = Client::default() - .with_url("http://localhost:8123") - .with_database("system"); - - let metadata = client - .get_row_metadata_for_insert::("roles") - .await - .unwrap(); - - assert_eq!(metadata.columns, SystemRolesRow::columns()); - assert_eq!(metadata.access_type, AccessType::WithSeqAccess); - - // we can now use a dummy client, cause the metadata is cached, - // and no calls to the database will be made - client - .with_url("whatever") - .get_row_metadata_for_insert::("roles") - .await - .unwrap(); - - assert_eq!(metadata.columns, SystemRolesRow::columns()); - assert_eq!(metadata.access_type, AccessType::WithSeqAccess); - } - #[test] fn it_does_follow_previous_configuration() { let client = Client::default().with_option("async_insert", "1"); diff --git a/tests/it/insert.rs b/tests/it/insert.rs index 39026368..232d8095 100644 --- a/tests/it/insert.rs +++ b/tests/it/insert.rs @@ -241,6 +241,98 @@ async fn insert_from_cursor() { assert_eq!(cursor.next().await.unwrap(), None); } +#[tokio::test] +async fn cache_row_metadata() { + #[derive(clickhouse::Row, serde::Serialize)] + struct Foo { + bar: i32, + baz: String, + } + + let db_name = test_database_name!(); + + let client = crate::_priv::prepare_database(&db_name) + .await + .with_validation(true); + + client + .query("CREATE TABLE foo(bar Int32, baz String) ENGINE = MergeTree PRIMARY KEY(bar)") + .execute() + .await + .unwrap(); + + // Ensure `system.query_log` is fully written + flush_query_log(&client).await; + + let select_query = "SELECT count() \ + FROM system.query_log \ + WHERE current_database = ? \ + AND query LIKE 'SELECT * FROM `foo` LIMIT 0%'"; + + let initial_count: u64 = client + .query(select_query) + .bind(&db_name) + .fetch_one() + .await + .unwrap(); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 1, + baz: "Hello, world!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + // Ensure `system.query_log` is fully written + flush_query_log(&client).await; + + let select_query = "SELECT count() \ + FROM system.query_log \ + WHERE current_database = ? \ + AND query LIKE 'SELECT * FROM `foo` LIMIT 0%'"; + + let after_insert: u64 = client + .query(select_query) + .bind(&db_name) + .fetch_one() + .await + .unwrap(); + + // If the database server has not been reset between test runs, `initial_count` will be nonzero. + // + // Instead, of asserting a specific value, we assert that the count has changed. + assert_ne!(after_insert, initial_count); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + flush_query_log(&client).await; + + let final_count: u64 = client + .query(select_query) + .bind(&db_name) + .fetch_one() + .await + .unwrap(); + + // Insert metadata is cached, so we should not have queried this table again. + assert_eq!(final_count, after_insert); +} + #[tokio::test] async fn clear_cached_metadata() { #[derive(clickhouse::Row, serde::Serialize)] @@ -327,7 +419,7 @@ async fn clear_cached_metadata() { insert.end().await.unwrap(); let rows = client - .query("SELECT * FROM foo") + .query("SELECT * FROM foo ORDER BY bar") .fetch_all::() .await .unwrap(); diff --git a/tests/it/main.rs b/tests/it/main.rs index 486e12b5..2d2a78ee 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -89,14 +89,16 @@ macro_rules! assert_panic_msg { macro_rules! prepare_database { () => { - crate::_priv::prepare_database({ + crate::_priv::prepare_database(&test_database_name!()).await + }; +} + +macro_rules! test_database_name { + () => { + crate::_priv::make_db_name({ fn f() {} - fn type_name_of_val(_: T) -> &'static str { - std::any::type_name::() - } - type_name_of_val(f) + std::any::type_name_of_val(&f) }) - .await }; } @@ -168,7 +170,12 @@ where } pub(crate) async fn flush_query_log(client: &Client) { - client.query("SYSTEM FLUSH LOGS").execute().await.unwrap(); + client + .query("SYSTEM FLUSH LOGS") + .with_option("wait_end_of_query", "1") + .execute() + .await + .unwrap(); } pub(crate) async fn execute_statements(client: &Client, statements: &[&str]) { @@ -274,14 +281,13 @@ mod _priv { use super::*; use std::time::SystemTime; - pub(crate) async fn prepare_database(fn_path: &str) -> Client { - let db_name = make_db_name(fn_path); + pub(crate) async fn prepare_database(db_name: &str) -> Client { let client = get_client(); client .query("DROP DATABASE IF EXISTS ?") .with_option("wait_end_of_query", "1") - .bind(Identifier(&db_name)) + .bind(Identifier(db_name)) .execute() .await .unwrap_or_else(|err| panic!("cannot drop db {db_name}, cause: {err}")); @@ -289,7 +295,7 @@ mod _priv { client .query("CREATE DATABASE ?") .with_option("wait_end_of_query", "1") - .bind(Identifier(&db_name)) + .bind(Identifier(db_name)) .execute() .await .unwrap_or_else(|err| panic!("cannot create db {db_name}, cause: {err}")); @@ -301,7 +307,7 @@ mod _priv { // `it::compression::lz4::{{closure}}::f` -> // - "local" env: `chrs__compression__lz4` // - "cloud" env: `chrs__compression__lz4__{unix_millis}` - fn make_db_name(fn_path: &str) -> String { + pub(crate) fn make_db_name(fn_path: &str) -> String { assert!(fn_path.starts_with("it::")); let mut iter = fn_path.split("::").skip(1); let module = iter.next().unwrap(); From 7a56e55ac84871addf2737f852f7a02467d24080 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Mon, 29 Sep 2025 15:26:35 -0700 Subject: [PATCH 09/13] fix(insert): allow omission of MATERIALIZED, ALIAS and DEFAULT columns and don't error on EPHEMERAL columns --- src/insert.rs | 5 +- src/lib.rs | 95 +++++++++++------ src/row_metadata.rs | 187 ++++++++++++++++++++++++---------- tests/it/insert.rs | 34 +++---- tests/it/rbwnat_smoke.rs | 67 ++++++------ tests/it/rbwnat_validation.rs | 9 +- 6 files changed, 259 insertions(+), 138 deletions(-) diff --git a/src/insert.rs b/src/insert.rs index 065e18ac..89ad7ac3 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -12,7 +12,6 @@ use bytes::{Bytes, BytesMut}; use clickhouse_types::put_rbwnat_columns_header; use hyper::{self, Request}; use replace_with::replace_with_or_abort; -use std::sync::Arc; use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration}; use tokio::{ task::JoinHandle, @@ -48,7 +47,7 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac pub struct Insert { state: InsertState, buffer: BytesMut, - row_metadata: Option>, + row_metadata: Option, #[cfg(feature = "lz4")] compression: Compression, send_timeout: Option, @@ -131,7 +130,7 @@ macro_rules! timeout { } impl Insert { - pub(crate) fn new(client: &Client, table: &str, row_metadata: Option>) -> Self + pub(crate) fn new(client: &Client, table: &str, row_metadata: Option) -> Self where T: Row, { diff --git a/src/lib.rs b/src/lib.rs index 9d927fc3..87e21a83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,13 @@ pub use self::{ row::{Row, RowOwned, RowRead, RowWrite}, }; use self::{error::Result, http_client::HttpClient}; -use crate::row_metadata::RowMetadata; -use crate::sql::Identifier; +use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata}; #[doc = include_str!("row_derive.md")] pub use clickhouse_derive::Row; -use clickhouse_types::parse_rbwnat_columns_header; +use clickhouse_types::{Column, DataTypeNode}; +use crate::_priv::row_insert_metadata_query; use std::{collections::HashMap, fmt::Display, sync::Arc}; use tokio::sync::RwLock; @@ -61,7 +61,7 @@ pub struct Client { headers: HashMap, products_info: Vec, validation: bool, - row_metadata_cache: Arc, + insert_metadata_cache: Arc, #[cfg(feature = "test-util")] mocked: bool, @@ -108,7 +108,7 @@ impl Default for Client { /// Cache for [`RowMetadata`] to avoid allocating it for the same struct more than once /// during the application lifecycle. Key: fully qualified table name (e.g. `database.table`). #[derive(Default)] -pub(crate) struct RowMetadataCache(RwLock>>); +pub(crate) struct InsertMetadataCache(RwLock>>); impl Client { /// Creates a new client with a specified underlying HTTP client. @@ -125,7 +125,7 @@ impl Client { headers: HashMap::new(), products_info: Vec::default(), validation: true, - row_metadata_cache: Arc::new(RowMetadataCache::default()), + insert_metadata_cache: Arc::new(InsertMetadataCache::default()), #[cfg(feature = "test-util")] mocked: false, } @@ -145,7 +145,7 @@ impl Client { self.url = url.into(); // Assume our cached metadata is invalid. - self.row_metadata_cache = Default::default(); + self.insert_metadata_cache = Default::default(); self } @@ -164,7 +164,7 @@ impl Client { self.database = Some(database.into()); // Assume our cached metadata is invalid. - self.row_metadata_cache = Default::default(); + self.insert_metadata_cache = Default::default(); self } @@ -356,8 +356,12 @@ impl Client { /// If `T` has unnamed fields, e.g. tuples. pub async fn insert(&self, table: &str) -> Result> { if self.get_validation() { - let metadata = self.get_row_metadata_for_insert::(table).await?; - return Ok(insert::Insert::new(self, table, Some(metadata))); + let metadata = self.get_insert_metadata(table).await?; + return Ok(insert::Insert::new( + self, + table, + Some(metadata.to_row::()), + )); } Ok(insert::Insert::new(self, table, None)) } @@ -409,7 +413,7 @@ impl Client { /// /// Cancel-safe. pub async fn clear_cached_metadata(&self) { - self.row_metadata_cache.0.write().await.clear(); + self.insert_metadata_cache.0.write().await.clear(); } /// Used internally to check if the validation mode is enabled, @@ -443,37 +447,51 @@ impl Client { self } - async fn get_row_metadata_for_insert( - &self, - table_name: &str, - ) -> Result> { + async fn get_insert_metadata(&self, table_name: &str) -> Result> { { - let read_lock = self.row_metadata_cache.0.read().await; + let read_lock = self.insert_metadata_cache.0.read().await; + // FIXME: `table_name` is not necessarily fully qualified here if let Some(metadata) = read_lock.get(table_name) { return Ok(metadata.clone()); } } // TODO: should it be moved to a cold function? - let mut write_lock = self.row_metadata_cache.0.write().await; + let mut write_lock = self.insert_metadata_cache.0.write().await; let db = match self.database { Some(ref db) => db, None => "default", }; - let mut bytes_cursor = self - .query("SELECT * FROM ? LIMIT 0") - .bind(Identifier(table_name)) - // don't allow to override the client database set in the client instance - // with a `.with_option("database", "some_other_db")` call on the app side - .with_option("database", db) - .fetch_bytes("RowBinaryWithNamesAndTypes")?; - let mut buffer = Vec::::new(); - while let Some(chunk) = bytes_cursor.next().await? { - buffer.extend_from_slice(&chunk); + + let mut columns_cursor = self + .query(&row_insert_metadata_query(db, table_name)) + .fetch::<(String, String, String)>()?; + + let mut columns = Vec::new(); + let mut column_default_kinds = Vec::new(); + let mut column_lookup = HashMap::new(); + + while let Some((name, type_, default_kind)) = columns_cursor.next().await? { + let data_type = DataTypeNode::new(&type_)?; + let default_kind = default_kind.parse::()?; + + column_lookup.insert(name.clone(), columns.len()); + + columns.push(Column { name, data_type }); + + column_default_kinds.push(default_kind); } - let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?; - let metadata = Arc::new(RowMetadata::new_for_insert::(columns)); + + let metadata = Arc::new(InsertMetadata { + row_metadata: RowMetadata { + columns, + access_type: AccessType::WithSeqAccess, // ignored on insert + }, + column_default_kinds, + column_lookup, + }); + write_lock.insert(table_name.to_string(), metadata.clone()); Ok(metadata) } @@ -489,6 +507,25 @@ pub mod _priv { pub fn lz4_compress(uncompressed: &[u8]) -> super::Result { crate::compression::lz4::compress(uncompressed) } + + // Also needed by `it::insert::cache_row_metadata()` + pub fn row_insert_metadata_query(db: &str, table: &str) -> String { + let mut out = "SELECT \ + name, \ + type, \ + default_kind \ + FROM system.columns \ + WHERE database = " + .to_string(); + + crate::sql::escape::string(db, &mut out).unwrap(); + + out.push_str(" AND table = "); + + crate::sql::escape::string(table, &mut out).unwrap(); + + out + } } #[cfg(test)] diff --git a/src/row_metadata.rs b/src/row_metadata.rs index d27ed103..0b4e543a 100644 --- a/src/row_metadata.rs +++ b/src/row_metadata.rs @@ -1,8 +1,10 @@ use crate::Row; +use crate::error::Error; use crate::row::RowKind; use clickhouse_types::Column; use std::collections::HashMap; -use std::fmt::Display; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; #[derive(Debug, PartialEq)] pub(crate) enum AccessType { @@ -32,6 +34,21 @@ pub(crate) struct RowMetadata { pub(crate) access_type: AccessType, } +pub(crate) struct InsertMetadata { + pub(crate) row_metadata: RowMetadata, + pub(crate) column_default_kinds: Vec, + pub(crate) column_lookup: HashMap, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(crate) enum ColumnDefaultKind { + Unset, + Default, + Materialized, + Ephemeral, + Alias, +} + impl RowMetadata { pub(crate) fn new_for_cursor(columns: Vec) -> Self { let access_type = match T::KIND { @@ -98,11 +115,10 @@ impl RowMetadata { mapping.push(index); } else { panic!( - "While processing struct {}: database schema has a column {} \ + "While processing struct {}: database schema has a column {col} \ that was not found in the struct definition.\ \n#### All struct fields:\n{}\n#### All schema columns:\n{}", T::NAME, - col, join_panic_schema_hint(T::COLUMN_NAMES), join_panic_schema_hint(&columns), ); @@ -121,54 +137,6 @@ impl RowMetadata { } } - pub(crate) fn new_for_insert(columns: Vec) -> Self { - if T::KIND != RowKind::Struct { - panic!( - "SerializerRowMetadata can only be created for structs, \ - but got {:?} instead.\n#### All schema columns:\n{}", - T::KIND, - join_panic_schema_hint(&columns), - ); - } - if columns.len() != T::COLUMN_NAMES.len() { - panic!( - "While processing struct {}: database schema has {} columns, \ - but the struct definition has {} fields.\ - \n#### All struct fields:\n{}\n#### All schema columns:\n{}", - T::NAME, - columns.len(), - T::COLUMN_NAMES.len(), - join_panic_schema_hint(T::COLUMN_NAMES), - join_panic_schema_hint(&columns), - ); - } - - let mut result_columns: Vec = Vec::with_capacity(columns.len()); - let db_columns_lookup: HashMap<&str, &Column> = - columns.iter().map(|col| (col.name.as_str(), col)).collect(); - - for struct_column_name in T::COLUMN_NAMES { - match db_columns_lookup.get(*struct_column_name) { - Some(col) => result_columns.push((*col).clone()), - None => { - panic!( - "While processing struct {}: database schema has no column named {}.\ - \n#### All struct fields:\n{}\n#### All schema columns:\n{}", - T::NAME, - struct_column_name, - join_panic_schema_hint(T::COLUMN_NAMES), - join_panic_schema_hint(&db_columns_lookup.values().collect::>()), - ); - } - } - } - - Self { - columns: result_columns, - access_type: AccessType::WithSeqAccess, // ignored - } - } - /// Returns the index of the column in the database schema /// that corresponds to the field with the given index in the struct. /// @@ -199,11 +167,120 @@ impl RowMetadata { } } -fn join_panic_schema_hint(col: &[T]) -> String { - if col.is_empty() { - return String::default(); +impl FromStr for ColumnDefaultKind { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "" => Ok(Self::Unset), + "DEFAULT" => Ok(Self::Default), + "MATERIALIZED" => Ok(Self::Materialized), + "EPHEMERAL" => Ok(Self::Ephemeral), + "ALIAS" => Ok(Self::Alias), + other => Err(Error::Other( + format!("unknown column default_kind {other}").into(), + )), + } + } +} + +impl ColumnDefaultKind { + pub(crate) fn is_immutable(self) -> bool { + matches!(self, Self::Materialized | Self::Alias) + } + + pub(crate) fn has_default(self) -> bool { + matches!(self, Self::Default | Self::Materialized | Self::Alias) + } + + pub(crate) fn to_str(self) -> &'static str { + match self { + ColumnDefaultKind::Unset => "", + ColumnDefaultKind::Default => "DEFAULT", + ColumnDefaultKind::Materialized => "MATERIALIZED", + ColumnDefaultKind::Ephemeral => "EPHEMERAL", + ColumnDefaultKind::Alias => "ALIAS", + } + } +} + +impl Display for ColumnDefaultKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.to_str()) + } +} + +impl InsertMetadata { + pub(crate) fn to_row(&self) -> RowMetadata { + if T::KIND != RowKind::Struct { + panic!( + "SerializerRowMetadata can only be created for structs, \ + but got {:?} instead.\n#### All schema columns:\n{}", + T::KIND, + join_panic_schema_hint(&self.row_metadata.columns), + ); + } + + let mut result_columns: Vec = Vec::with_capacity(T::COLUMN_COUNT); + let mut set_columns: Vec = vec![false; self.row_metadata.columns.len()]; + + for struct_column_name in T::COLUMN_NAMES { + match self.column_lookup.get(*struct_column_name) { + Some(&col) => { + if self.column_default_kinds[col].is_immutable() { + panic!( + "While processing struct {}: column {struct_column_name} is immutable (declared as `{}`)", + T::NAME, + self.column_default_kinds[col], + ); + } + + // TODO: what should happen if a column is mentioned multiple times? + set_columns[col] = true; + + result_columns.push(self.row_metadata.columns[col].clone()) + } + None => { + panic!( + "While processing struct {}: database schema has no column named {struct_column_name}.\ + \n#### All struct fields:\n{}\n#### All schema columns:\n{}", + T::NAME, + join_panic_schema_hint(T::COLUMN_NAMES), + join_panic_schema_hint(&self.row_metadata.columns), + ); + } + } + } + + let missing_columns = set_columns.iter().enumerate().filter_map(|(col, &is_set)| { + if is_set || self.column_default_kinds[col].has_default() { + return None; + } + + Some(&self.row_metadata.columns[col]) + }); + + let missing_columns_hint = join_panic_schema_hint(missing_columns); + + if !missing_columns_hint.is_empty() { + panic!( + "While processing struct {}: the following non-default columns are missing:\n{missing_columns_hint}\ + \n#### All struct fields:\n{}\n#### All schema columns:\n{}", + T::NAME, + join_panic_schema_hint(T::COLUMN_NAMES), + join_panic_schema_hint(&self.row_metadata.columns), + ) + } + + RowMetadata { + columns: result_columns, + access_type: AccessType::WithSeqAccess, // ignored + } } - col.iter() +} + +fn join_panic_schema_hint(col: impl IntoIterator) -> String { + col.into_iter() .map(|c| format!("- {c}")) .collect::>() .join("\n") diff --git a/tests/it/insert.rs b/tests/it/insert.rs index 232d8095..4f43e7c4 100644 --- a/tests/it/insert.rs +++ b/tests/it/insert.rs @@ -250,6 +250,7 @@ async fn cache_row_metadata() { } let db_name = test_database_name!(); + let table_name = "foo"; let client = crate::_priv::prepare_database(&db_name) .await @@ -264,19 +265,21 @@ async fn cache_row_metadata() { // Ensure `system.query_log` is fully written flush_query_log(&client).await; - let select_query = "SELECT count() \ - FROM system.query_log \ - WHERE current_database = ? \ - AND query LIKE 'SELECT * FROM `foo` LIMIT 0%'"; + let count_query = "SELECT count() FROM system.query_log WHERE query LIKE ? || '%'"; + + let row_insert_metadata_query = + clickhouse::_priv::row_insert_metadata_query(&db_name, table_name); + + println!("row_insert_metadata_query: {row_insert_metadata_query:?}"); let initial_count: u64 = client - .query(select_query) - .bind(&db_name) + .query(count_query) + .bind(&row_insert_metadata_query) .fetch_one() .await .unwrap(); - let mut insert = client.insert::("foo").await.unwrap(); + let mut insert = client.insert::(table_name).await.unwrap(); insert .write(&Foo { @@ -291,14 +294,9 @@ async fn cache_row_metadata() { // Ensure `system.query_log` is fully written flush_query_log(&client).await; - let select_query = "SELECT count() \ - FROM system.query_log \ - WHERE current_database = ? \ - AND query LIKE 'SELECT * FROM `foo` LIMIT 0%'"; - let after_insert: u64 = client - .query(select_query) - .bind(&db_name) + .query(count_query) + .bind(&row_insert_metadata_query) .fetch_one() .await .unwrap(); @@ -308,7 +306,7 @@ async fn cache_row_metadata() { // Instead, of asserting a specific value, we assert that the count has changed. assert_ne!(after_insert, initial_count); - let mut insert = client.insert::("foo").await.unwrap(); + let mut insert = client.insert::(table_name).await.unwrap(); insert .write(&Foo { @@ -323,8 +321,8 @@ async fn cache_row_metadata() { flush_query_log(&client).await; let final_count: u64 = client - .query(select_query) - .bind(&db_name) + .query(count_query) + .bind(&row_insert_metadata_query) .fetch_one() .await .unwrap(); @@ -410,7 +408,7 @@ async fn clear_cached_metadata() { .expect_err("`Foo` should no longer be valid for the table"); }); - assert_panic_msg!(write_invalid, ["1 columns", "2 fields", "bar", "baz"]); + assert_panic_msg!(write_invalid, ["bar", "baz"]); let mut insert = client.insert::("foo").await.unwrap(); diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index a2ec71ad..50653380 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -9,6 +9,7 @@ use linked_hash_map::LinkedHashMap; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::collections::HashMap; +use std::panic::AssertUnwindSafe; use std::str::FromStr; #[tokio::test] @@ -1414,18 +1415,7 @@ async fn interval() { } // See https://clickhouse.com/docs/sql-reference/statements/create/table#ephemeral -// -// Ignored cause: -// -// While processing struct DataInsert: database schema has no column named hexed. -// #### All struct fields: -// - id -// - hexed -// #### All schema columns: -// - raw: FixedString(3) -// - id: UInt64 #[tokio::test] -#[ignore] async fn ephemeral_columns() { let table_name = "test_ephemeral_columns"; @@ -1441,7 +1431,7 @@ async fn ephemeral_columns() { raw: [u8; 3], } - let client = get_client(); + let client = prepare_database!(); client .query( " @@ -1510,17 +1500,22 @@ async fn ephemeral_columns() { // #### All schema columns: // - x: Int64 #[tokio::test] -#[ignore] async fn materialized_columns() { let table_name = "test_materialized_columns"; #[derive(Clone, Debug, Row, Serialize, Deserialize, PartialEq)] struct Data { x: i64, + } + + #[derive(Clone, Debug, Row, Serialize, Deserialize, PartialEq)] + struct DataWithMaterialized { + x: i64, + // MATERIALIZED columns cannot be inserted into s: String, } - let client = get_client(); + let client = prepare_database!(); execute_statements( &client, &[ @@ -1540,44 +1535,52 @@ async fn materialized_columns() { let rows = client .query("SELECT ?fields FROM ? ORDER BY x ASC") .bind(Identifier(table_name)) - .fetch_all::() + .fetch_all::() .await .unwrap(); let expected_rows = (0..5) - .map(|x| Data { + .map(|x| DataWithMaterialized { x, s: x.to_string(), }) .collect::>(); assert_eq!(rows, expected_rows); - let rows_to_insert = vec![ - Data { - x: 5, - s: "5".to_string(), - }, - Data { - x: 6, - s: "6".to_string(), - }, - ]; + let insert_data = AssertUnwindSafe(async { + let _ = client + .insert::(table_name) + .await + .unwrap(); + }); + + assert_panic_msg!( + insert_data, + ["column s is immutable (declared as `MATERIALIZED`)"] + ); + + let rows_to_insert = (5..10).map(|x| Data { x }); - // fails on this insert let mut insert = client.insert::(table_name).await.unwrap(); - for row in &rows_to_insert { - insert.write(row).await.unwrap(); + for row in rows_to_insert { + insert.write(&row).await.unwrap(); } insert.end().await.unwrap(); let rows_after_insert = client .query("SELECT ?fields FROM ? ORDER BY x ASC") .bind(Identifier(table_name)) - .fetch_all::() + .fetch_all::() .await .unwrap(); - let expected_rows_after_insert = [&rows[..], &rows_to_insert[..]].concat(); + let expected_rows_after_insert = (0..10) + .map(|x| DataWithMaterialized { + x, + s: x.to_string(), + }) + .collect::>(); + assert_eq!(rows_after_insert, expected_rows_after_insert); } @@ -1599,7 +1602,7 @@ async fn alias_columns() { size_bytes: i64, } - let client = get_client(); + let client = prepare_database!(); execute_statements( &client, &[ diff --git a/tests/it/rbwnat_validation.rs b/tests/it/rbwnat_validation.rs index b90f87d1..44996eb0 100644 --- a/tests/it/rbwnat_validation.rs +++ b/tests/it/rbwnat_validation.rs @@ -531,7 +531,14 @@ async fn issue_109_1() { let unwind = std::panic::AssertUnwindSafe(async { let _ = client.insert::("issue_109").await.unwrap(); }); - assert_panic_msg!(unwind, &["Data", "4 columns", "3 fields"]); + + assert_panic_msg!( + unwind, + &[ + "the following non-default columns are missing", + "en_id: String" + ] + ); } #[tokio::test] From 072773da30f3d60e6382379c36c7fd2199b7c52c Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 1 Oct 2025 12:08:33 -0700 Subject: [PATCH 10/13] fix(mock): don't break existing usage of `Client::with_url` --- src/lib.rs | 10 +++++++++- src/test/mock.rs | 31 ++++++++++++++++++++++++++++--- tests/it/mock.rs | 15 +++++++++++++++ 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 87e21a83..082dc351 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,6 +144,14 @@ impl Client { pub fn with_url(mut self, url: impl Into) -> Self { self.url = url.into(); + // `with_mock()` didn't exist previously, so to not break existing usages, + // we need to be able to detect a mocked server using nothing but the URL. + #[cfg(feature = "test-util")] + if let Some(url) = test::Mock::mocked_url_to_real(&self.url) { + self.url = url; + self.mocked = true; + } + // Assume our cached metadata is invalid. self.insert_metadata_cache = Default::default(); @@ -442,7 +450,7 @@ impl Client { /// which is pointless in that kind of tests. #[cfg(feature = "test-util")] pub fn with_mock(mut self, mock: &test::Mock) -> Self { - self.url = mock.url().to_string(); + self.url = mock.real_url().to_string(); self.mocked = true; self } diff --git a/src/test/mock.rs b/src/test/mock.rs index 48e13cb5..5804c5c4 100644 --- a/src/test/mock.rs +++ b/src/test/mock.rs @@ -15,9 +15,22 @@ use tokio::{net::TcpListener, task::AbortHandle}; use super::{Handler, HandlerFn}; +/// URL using a special hostname that `Client` can use to detect a mocked server. +/// +/// This is to avoid breaking existing usages of the mock API which just call +/// `client.with_url(mock.url)`. +/// +/// This domain should not resolve otherwise. The `.test` top-level domain +/// is reserved and cannot be registered on the open Internet. +const MOCKED_BASE_URL: &'static str = "http://mocked.clickhouse.test"; + +/// The real base URL where the mocked server is listening. +const REAL_BASE_URL: &'static str = "http://127.0.0.1"; + /// A mock server for testing. pub struct Mock { - url: String, + mock_url: String, + pub(crate) real_url: String, shared: Arc>, non_exhaustive: bool, server_handle: AbortHandle, @@ -51,7 +64,8 @@ impl Mock { let server_handle = tokio::spawn(server(listener, shared.clone())); Self { - url: format!("http://{addr}"), + mock_url: format!("{MOCKED_BASE_URL}:{}", addr.port()), + real_url: format!("{REAL_BASE_URL}:{}", addr.port()), non_exhaustive: false, server_handle: server_handle.abort_handle(), shared, @@ -62,7 +76,18 @@ impl Mock { /// /// [`Client`]: crate::Client::with_url pub fn url(&self) -> &str { - &self.url + &self.mock_url + } + + pub(crate) fn real_url(&self) -> &str { + &self.real_url + } + + /// Returns `Some` if `url` was a mocked URL and converted to real, `None` if already real. + pub(crate) fn mocked_url_to_real(url: &str) -> Option { + url.strip_prefix(MOCKED_BASE_URL) + // rest = ":{port}" + .map(|rest| format!("{REAL_BASE_URL}{rest}")) } /// Adds a handler to the test server for the next request. diff --git a/tests/it/mock.rs b/tests/it/mock.rs index 00866f06..f4ab37c2 100644 --- a/tests/it/mock.rs +++ b/tests/it/mock.rs @@ -25,3 +25,18 @@ async fn provide() { tokio::time::advance(Duration::from_secs(100_000)).await; test_provide().await; } + +#[tokio::test] +async fn client_with_url() { + let mock = test::Mock::new(); + + // Existing usages before `with_mock()` was introduced should not silently break. + let client = Client::default().with_url(mock.url()); + let expected = vec![SimpleRow::new(1, "one"), SimpleRow::new(2, "two")]; + + // FIXME: &expected is not allowed due to new trait bounds + mock.add(test::handlers::provide(expected.clone())); + + let actual = crate::fetch_rows::(&client, "doesn't matter").await; + assert_eq!(actual, expected); +} From e75290f220529172e51e7b4a6105dac482b5e6e6 Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 1 Oct 2025 13:35:50 -0700 Subject: [PATCH 11/13] doc(client): note that validation is forced off when mocking --- src/lib.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 082dc351..97fc795b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -403,6 +403,15 @@ impl Client { /// in your specific use case. Additionally, writing smoke tests to ensure that /// the row types match the ClickHouse schema is highly recommended, /// if you plan to disable validation in your application. + /// + /// # Note: Mocking + /// When using [`test::Mock`] with the `test-util` feature, validation is forced off. + /// + /// This applies either when using [`Client::with_mock()`], or [`Client::with_url()`] + /// with a URL from [`test::Mock::url()`]. + /// + /// As of writing, the mocking facilities are unable to generate the `RowBinaryWithNamesAndTypes` + /// header required for validation to function. pub fn with_validation(mut self, enabled: bool) -> Self { self.validation = enabled; self From 2e57c00a57e50cac4165c019e31a7f494f83058d Mon Sep 17 00:00:00 2001 From: Austin Bonander Date: Wed, 1 Oct 2025 13:43:31 -0700 Subject: [PATCH 12/13] fix: clippy, docs --- .github/workflows/ci.yml | 6 +++--- src/rowbinary/de.rs | 2 +- src/test/mock.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c570fcc2..ee26211a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,9 +59,9 @@ jobs: - run: rustup override set nightly - run: rustup show active-toolchain -v # Serde 1.0.227 fails to build with `--cfg docsrs`, only pass it to our own packages - - run: cargo rustdoc -p clickhouse-derive -- -D warnings --cfg docsrs - - run: cargo rustdoc -p clickhouse-types -- -D warnings --cfg docsrs - - run: cargo rustdoc -p clickhouse -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse-derive --all-features -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse-types --all-features -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse --all-features -- -D warnings --cfg docsrs test: runs-on: ubuntu-latest diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index 2df55748..cb7d3529 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -213,7 +213,7 @@ where /// This is used to deserialize identifiers for either: /// - `Variant` data type - /// - [`RowBinaryStructAsMapAccess`] field. + /// - out-of-order struct fields using [`MapAccess`]. #[inline(always)] fn deserialize_identifier>(self, visitor: V) -> Result { ensure_size(&mut self.input, size_of::())?; diff --git a/src/test/mock.rs b/src/test/mock.rs index 5804c5c4..299b8de0 100644 --- a/src/test/mock.rs +++ b/src/test/mock.rs @@ -22,10 +22,10 @@ use super::{Handler, HandlerFn}; /// /// This domain should not resolve otherwise. The `.test` top-level domain /// is reserved and cannot be registered on the open Internet. -const MOCKED_BASE_URL: &'static str = "http://mocked.clickhouse.test"; +const MOCKED_BASE_URL: &str = "http://mocked.clickhouse.test"; /// The real base URL where the mocked server is listening. -const REAL_BASE_URL: &'static str = "http://127.0.0.1"; +const REAL_BASE_URL: &str = "http://127.0.0.1"; /// A mock server for testing. pub struct Mock { From a3932a682a0fd03934dfead05cd3195399f236f2 Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Fri, 3 Oct 2025 01:53:30 +0200 Subject: [PATCH 13/13] fix: update comment in rbwnat_smoke.rs --- tests/it/rbwnat_smoke.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index 50653380..77ae3005 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -1490,15 +1490,6 @@ async fn ephemeral_columns() { } // See https://clickhouse.com/docs/sql-reference/statements/alter/column#materialize-column -// -// Ignored cause: -// -// While processing struct Data: database schema has 1 column(s), but the struct definition has 2 field(s). -// #### All struct fields: -// - x -// - s -// #### All schema columns: -// - x: Int64 #[tokio::test] async fn materialized_columns() { let table_name = "test_materialized_columns";