diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c570fcc2..ee26211a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,9 +59,9 @@ jobs: - run: rustup override set nightly - run: rustup show active-toolchain -v # Serde 1.0.227 fails to build with `--cfg docsrs`, only pass it to our own packages - - run: cargo rustdoc -p clickhouse-derive -- -D warnings --cfg docsrs - - run: cargo rustdoc -p clickhouse-types -- -D warnings --cfg docsrs - - run: cargo rustdoc -p clickhouse -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse-derive --all-features -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse-types --all-features -- -D warnings --cfg docsrs + - run: cargo rustdoc -p clickhouse --all-features -- -D warnings --cfg docsrs test: runs-on: ubuntu-latest diff --git a/docker-compose.yml b/docker-compose.yml index cc309127..a7dc8456 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,8 @@ name: clickhouse-rs services: clickhouse: - image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}' + # Note: use of a fully qualified url makes Podman happy without need for further configuration. + image: 'docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}' container_name: 'clickhouse-rs-clickhouse-server' environment: CLICKHOUSE_SKIP_USER_SETUP: 1 diff --git a/src/insert.rs b/src/insert.rs index 8382b26a..89ad7ac3 100644 --- a/src/insert.rs +++ b/src/insert.rs @@ -12,7 +12,6 @@ use bytes::{Bytes, BytesMut}; use clickhouse_types::put_rbwnat_columns_header; use hyper::{self, Request}; use replace_with::replace_with_or_abort; -use std::sync::Arc; use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration}; use tokio::{ task::JoinHandle, @@ -34,11 +33,21 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac /// Otherwise, the whole `INSERT` will be aborted. /// /// Rows are being sent progressively to spread network load. +/// +/// # Note: Metadata is Cached +/// If [validation is enabled][Client::with_validation], +/// this helper will query the metadata for the target table to learn the column names and types. +/// +/// To avoid querying this metadata every time, it is cached within the [`Client`]. +/// +/// Any concurrent changes to the table schema may cause insert failures if the metadata +/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`] +/// after any changes to the current database schema. #[must_use] pub struct Insert { state: InsertState, buffer: BytesMut, - row_metadata: Option>, + row_metadata: Option, #[cfg(feature = "lz4")] compression: Compression, send_timeout: Option, @@ -121,7 +130,7 @@ macro_rules! timeout { } impl Insert { - pub(crate) fn new(client: &Client, table: &str, row_metadata: Option>) -> Self + pub(crate) fn new(client: &Client, table: &str, row_metadata: Option) -> Self where T: Row, { diff --git a/src/lib.rs b/src/lib.rs index 54f1a5c2..97fc795b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,13 @@ pub use self::{ row::{Row, RowOwned, RowRead, RowWrite}, }; use self::{error::Result, http_client::HttpClient}; -use crate::row_metadata::RowMetadata; -use crate::sql::Identifier; +use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata}; #[doc = include_str!("row_derive.md")] pub use clickhouse_derive::Row; -use clickhouse_types::parse_rbwnat_columns_header; +use clickhouse_types::{Column, DataTypeNode}; +use crate::_priv::row_insert_metadata_query; use std::{collections::HashMap, fmt::Display, sync::Arc}; use tokio::sync::RwLock; @@ -61,7 +61,7 @@ pub struct Client { headers: HashMap, products_info: Vec, validation: bool, - row_metadata_cache: Arc, + insert_metadata_cache: Arc, #[cfg(feature = "test-util")] mocked: bool, @@ -107,13 +107,8 @@ impl Default for Client { /// Cache for [`RowMetadata`] to avoid allocating it for the same struct more than once /// during the application lifecycle. Key: fully qualified table name (e.g. `database.table`). -pub(crate) struct RowMetadataCache(RwLock>>); - -impl Default for RowMetadataCache { - fn default() -> Self { - RowMetadataCache(RwLock::new(HashMap::default())) - } -} +#[derive(Default)] +pub(crate) struct InsertMetadataCache(RwLock>>); impl Client { /// Creates a new client with a specified underlying HTTP client. @@ -130,7 +125,7 @@ impl Client { headers: HashMap::new(), products_info: Vec::default(), validation: true, - row_metadata_cache: Arc::new(RowMetadataCache::default()), + insert_metadata_cache: Arc::new(InsertMetadataCache::default()), #[cfg(feature = "test-util")] mocked: false, } @@ -138,6 +133,9 @@ impl Client { /// Specifies ClickHouse's url. Should point to HTTP endpoint. /// + /// Automatically [clears the metadata cache][Self::clear_cached_metadata] + /// for this instance only. + /// /// # Examples /// ``` /// # use clickhouse::Client; @@ -145,11 +143,26 @@ impl Client { /// ``` pub fn with_url(mut self, url: impl Into) -> Self { self.url = url.into(); + + // `with_mock()` didn't exist previously, so to not break existing usages, + // we need to be able to detect a mocked server using nothing but the URL. + #[cfg(feature = "test-util")] + if let Some(url) = test::Mock::mocked_url_to_real(&self.url) { + self.url = url; + self.mocked = true; + } + + // Assume our cached metadata is invalid. + self.insert_metadata_cache = Default::default(); + self } /// Specifies a database name. /// + /// Automatically [clears the metadata cache][Self::clear_cached_metadata] + /// for this instance only. + /// /// # Examples /// ``` /// # use clickhouse::Client; @@ -157,6 +170,10 @@ impl Client { /// ``` pub fn with_database(mut self, database: impl Into) -> Self { self.database = Some(database.into()); + + // Assume our cached metadata is invalid. + self.insert_metadata_cache = Default::default(); + self } @@ -347,8 +364,12 @@ impl Client { /// If `T` has unnamed fields, e.g. tuples. pub async fn insert(&self, table: &str) -> Result> { if self.get_validation() { - let metadata = self.get_row_metadata_for_insert::(table).await?; - return Ok(insert::Insert::new(self, table, Some(metadata))); + let metadata = self.get_insert_metadata(table).await?; + return Ok(insert::Insert::new( + self, + table, + Some(metadata.to_row::()), + )); } Ok(insert::Insert::new(self, table, None)) } @@ -382,11 +403,36 @@ impl Client { /// in your specific use case. Additionally, writing smoke tests to ensure that /// the row types match the ClickHouse schema is highly recommended, /// if you plan to disable validation in your application. + /// + /// # Note: Mocking + /// When using [`test::Mock`] with the `test-util` feature, validation is forced off. + /// + /// This applies either when using [`Client::with_mock()`], or [`Client::with_url()`] + /// with a URL from [`test::Mock::url()`]. + /// + /// As of writing, the mocking facilities are unable to generate the `RowBinaryWithNamesAndTypes` + /// header required for validation to function. pub fn with_validation(mut self, enabled: bool) -> Self { self.validation = enabled; self } + /// Clear table metadata that was previously received and cached. + /// + /// [`Insert`][crate::insert::Insert] uses cached metadata when sending data with validation. + /// If the table schema changes, this metadata needs to re-fetched. + /// + /// This method clears the metadata cache, causing future insert queries to re-fetch metadata. + /// This applies to all cloned instances of this `Client` (using the same URL and database) + /// as well. + /// + /// This may need to wait to acquire a lock if a query is concurrently writing into the cache. + /// + /// Cancel-safe. + pub async fn clear_cached_metadata(&self) { + self.insert_metadata_cache.0.write().await.clear(); + } + /// Used internally to check if the validation mode is enabled, /// as it takes into account the `test-util` feature flag. #[inline] @@ -413,43 +459,58 @@ impl Client { /// which is pointless in that kind of tests. #[cfg(feature = "test-util")] pub fn with_mock(mut self, mock: &test::Mock) -> Self { - self.url = mock.url().to_string(); + self.url = mock.real_url().to_string(); self.mocked = true; self } - async fn get_row_metadata_for_insert( - &self, - table_name: &str, - ) -> Result> { - let read_lock = self.row_metadata_cache.0.read().await; - match read_lock.get(table_name) { - Some(metadata) => Ok(metadata.clone()), - None => { - drop(read_lock); - // TODO: should it be moved to a cold function? - let mut write_lock = self.row_metadata_cache.0.write().await; - let db = match self.database { - Some(ref db) => db, - None => "default", - }; - let mut bytes_cursor = self - .query("SELECT * FROM ? LIMIT 0") - .bind(Identifier(table_name)) - // don't allow to override the client database set in the client instance - // with a `.with_option("database", "some_other_db")` call on the app side - .with_option("database", db) - .fetch_bytes("RowBinaryWithNamesAndTypes")?; - let mut buffer = Vec::::new(); - while let Some(chunk) = bytes_cursor.next().await? { - buffer.extend_from_slice(&chunk); - } - let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?; - let metadata = Arc::new(RowMetadata::new_for_insert::(columns)); - write_lock.insert(table_name.to_string(), metadata.clone()); - Ok(metadata) + async fn get_insert_metadata(&self, table_name: &str) -> Result> { + { + let read_lock = self.insert_metadata_cache.0.read().await; + + // FIXME: `table_name` is not necessarily fully qualified here + if let Some(metadata) = read_lock.get(table_name) { + return Ok(metadata.clone()); } } + + // TODO: should it be moved to a cold function? + let mut write_lock = self.insert_metadata_cache.0.write().await; + let db = match self.database { + Some(ref db) => db, + None => "default", + }; + + let mut columns_cursor = self + .query(&row_insert_metadata_query(db, table_name)) + .fetch::<(String, String, String)>()?; + + let mut columns = Vec::new(); + let mut column_default_kinds = Vec::new(); + let mut column_lookup = HashMap::new(); + + while let Some((name, type_, default_kind)) = columns_cursor.next().await? { + let data_type = DataTypeNode::new(&type_)?; + let default_kind = default_kind.parse::()?; + + column_lookup.insert(name.clone(), columns.len()); + + columns.push(Column { name, data_type }); + + column_default_kinds.push(default_kind); + } + + let metadata = Arc::new(InsertMetadata { + row_metadata: RowMetadata { + columns, + access_type: AccessType::WithSeqAccess, // ignored on insert + }, + column_default_kinds, + column_lookup, + }); + + write_lock.insert(table_name.to_string(), metadata.clone()); + Ok(metadata) } } @@ -463,6 +524,25 @@ pub mod _priv { pub fn lz4_compress(uncompressed: &[u8]) -> super::Result { crate::compression::lz4::compress(uncompressed) } + + // Also needed by `it::insert::cache_row_metadata()` + pub fn row_insert_metadata_query(db: &str, table: &str) -> String { + let mut out = "SELECT \ + name, \ + type, \ + default_kind \ + FROM system.columns \ + WHERE database = " + .to_string(); + + crate::sql::escape::string(db, &mut out).unwrap(); + + out.push_str(" AND table = "); + + crate::sql::escape::string(table, &mut out).unwrap(); + + out + } } #[cfg(test)] @@ -643,32 +723,6 @@ mod client_tests { ); } - #[tokio::test] - async fn cache_row_metadata() { - let client = Client::default() - .with_url("http://localhost:8123") - .with_database("system"); - - let metadata = client - .get_row_metadata_for_insert::("roles") - .await - .unwrap(); - - assert_eq!(metadata.columns, SystemRolesRow::columns()); - assert_eq!(metadata.access_type, AccessType::WithSeqAccess); - - // we can now use a dummy client, cause the metadata is cached, - // and no calls to the database will be made - client - .with_url("whatever") - .get_row_metadata_for_insert::("roles") - .await - .unwrap(); - - assert_eq!(metadata.columns, SystemRolesRow::columns()); - assert_eq!(metadata.access_type, AccessType::WithSeqAccess); - } - #[test] fn it_does_follow_previous_configuration() { let client = Client::default().with_option("async_insert", "1"); diff --git a/src/row_metadata.rs b/src/row_metadata.rs index d27ed103..0b4e543a 100644 --- a/src/row_metadata.rs +++ b/src/row_metadata.rs @@ -1,8 +1,10 @@ use crate::Row; +use crate::error::Error; use crate::row::RowKind; use clickhouse_types::Column; use std::collections::HashMap; -use std::fmt::Display; +use std::fmt::{Display, Formatter}; +use std::str::FromStr; #[derive(Debug, PartialEq)] pub(crate) enum AccessType { @@ -32,6 +34,21 @@ pub(crate) struct RowMetadata { pub(crate) access_type: AccessType, } +pub(crate) struct InsertMetadata { + pub(crate) row_metadata: RowMetadata, + pub(crate) column_default_kinds: Vec, + pub(crate) column_lookup: HashMap, +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub(crate) enum ColumnDefaultKind { + Unset, + Default, + Materialized, + Ephemeral, + Alias, +} + impl RowMetadata { pub(crate) fn new_for_cursor(columns: Vec) -> Self { let access_type = match T::KIND { @@ -98,11 +115,10 @@ impl RowMetadata { mapping.push(index); } else { panic!( - "While processing struct {}: database schema has a column {} \ + "While processing struct {}: database schema has a column {col} \ that was not found in the struct definition.\ \n#### All struct fields:\n{}\n#### All schema columns:\n{}", T::NAME, - col, join_panic_schema_hint(T::COLUMN_NAMES), join_panic_schema_hint(&columns), ); @@ -121,54 +137,6 @@ impl RowMetadata { } } - pub(crate) fn new_for_insert(columns: Vec) -> Self { - if T::KIND != RowKind::Struct { - panic!( - "SerializerRowMetadata can only be created for structs, \ - but got {:?} instead.\n#### All schema columns:\n{}", - T::KIND, - join_panic_schema_hint(&columns), - ); - } - if columns.len() != T::COLUMN_NAMES.len() { - panic!( - "While processing struct {}: database schema has {} columns, \ - but the struct definition has {} fields.\ - \n#### All struct fields:\n{}\n#### All schema columns:\n{}", - T::NAME, - columns.len(), - T::COLUMN_NAMES.len(), - join_panic_schema_hint(T::COLUMN_NAMES), - join_panic_schema_hint(&columns), - ); - } - - let mut result_columns: Vec = Vec::with_capacity(columns.len()); - let db_columns_lookup: HashMap<&str, &Column> = - columns.iter().map(|col| (col.name.as_str(), col)).collect(); - - for struct_column_name in T::COLUMN_NAMES { - match db_columns_lookup.get(*struct_column_name) { - Some(col) => result_columns.push((*col).clone()), - None => { - panic!( - "While processing struct {}: database schema has no column named {}.\ - \n#### All struct fields:\n{}\n#### All schema columns:\n{}", - T::NAME, - struct_column_name, - join_panic_schema_hint(T::COLUMN_NAMES), - join_panic_schema_hint(&db_columns_lookup.values().collect::>()), - ); - } - } - } - - Self { - columns: result_columns, - access_type: AccessType::WithSeqAccess, // ignored - } - } - /// Returns the index of the column in the database schema /// that corresponds to the field with the given index in the struct. /// @@ -199,11 +167,120 @@ impl RowMetadata { } } -fn join_panic_schema_hint(col: &[T]) -> String { - if col.is_empty() { - return String::default(); +impl FromStr for ColumnDefaultKind { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "" => Ok(Self::Unset), + "DEFAULT" => Ok(Self::Default), + "MATERIALIZED" => Ok(Self::Materialized), + "EPHEMERAL" => Ok(Self::Ephemeral), + "ALIAS" => Ok(Self::Alias), + other => Err(Error::Other( + format!("unknown column default_kind {other}").into(), + )), + } + } +} + +impl ColumnDefaultKind { + pub(crate) fn is_immutable(self) -> bool { + matches!(self, Self::Materialized | Self::Alias) + } + + pub(crate) fn has_default(self) -> bool { + matches!(self, Self::Default | Self::Materialized | Self::Alias) + } + + pub(crate) fn to_str(self) -> &'static str { + match self { + ColumnDefaultKind::Unset => "", + ColumnDefaultKind::Default => "DEFAULT", + ColumnDefaultKind::Materialized => "MATERIALIZED", + ColumnDefaultKind::Ephemeral => "EPHEMERAL", + ColumnDefaultKind::Alias => "ALIAS", + } + } +} + +impl Display for ColumnDefaultKind { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.to_str()) + } +} + +impl InsertMetadata { + pub(crate) fn to_row(&self) -> RowMetadata { + if T::KIND != RowKind::Struct { + panic!( + "SerializerRowMetadata can only be created for structs, \ + but got {:?} instead.\n#### All schema columns:\n{}", + T::KIND, + join_panic_schema_hint(&self.row_metadata.columns), + ); + } + + let mut result_columns: Vec = Vec::with_capacity(T::COLUMN_COUNT); + let mut set_columns: Vec = vec![false; self.row_metadata.columns.len()]; + + for struct_column_name in T::COLUMN_NAMES { + match self.column_lookup.get(*struct_column_name) { + Some(&col) => { + if self.column_default_kinds[col].is_immutable() { + panic!( + "While processing struct {}: column {struct_column_name} is immutable (declared as `{}`)", + T::NAME, + self.column_default_kinds[col], + ); + } + + // TODO: what should happen if a column is mentioned multiple times? + set_columns[col] = true; + + result_columns.push(self.row_metadata.columns[col].clone()) + } + None => { + panic!( + "While processing struct {}: database schema has no column named {struct_column_name}.\ + \n#### All struct fields:\n{}\n#### All schema columns:\n{}", + T::NAME, + join_panic_schema_hint(T::COLUMN_NAMES), + join_panic_schema_hint(&self.row_metadata.columns), + ); + } + } + } + + let missing_columns = set_columns.iter().enumerate().filter_map(|(col, &is_set)| { + if is_set || self.column_default_kinds[col].has_default() { + return None; + } + + Some(&self.row_metadata.columns[col]) + }); + + let missing_columns_hint = join_panic_schema_hint(missing_columns); + + if !missing_columns_hint.is_empty() { + panic!( + "While processing struct {}: the following non-default columns are missing:\n{missing_columns_hint}\ + \n#### All struct fields:\n{}\n#### All schema columns:\n{}", + T::NAME, + join_panic_schema_hint(T::COLUMN_NAMES), + join_panic_schema_hint(&self.row_metadata.columns), + ) + } + + RowMetadata { + columns: result_columns, + access_type: AccessType::WithSeqAccess, // ignored + } } - col.iter() +} + +fn join_panic_schema_hint(col: impl IntoIterator) -> String { + col.into_iter() .map(|c| format!("- {c}")) .collect::>() .join("\n") diff --git a/src/rowbinary/de.rs b/src/rowbinary/de.rs index 2df55748..cb7d3529 100644 --- a/src/rowbinary/de.rs +++ b/src/rowbinary/de.rs @@ -213,7 +213,7 @@ where /// This is used to deserialize identifiers for either: /// - `Variant` data type - /// - [`RowBinaryStructAsMapAccess`] field. + /// - out-of-order struct fields using [`MapAccess`]. #[inline(always)] fn deserialize_identifier>(self, visitor: V) -> Result { ensure_size(&mut self.input, size_of::())?; diff --git a/src/test/mock.rs b/src/test/mock.rs index 48e13cb5..299b8de0 100644 --- a/src/test/mock.rs +++ b/src/test/mock.rs @@ -15,9 +15,22 @@ use tokio::{net::TcpListener, task::AbortHandle}; use super::{Handler, HandlerFn}; +/// URL using a special hostname that `Client` can use to detect a mocked server. +/// +/// This is to avoid breaking existing usages of the mock API which just call +/// `client.with_url(mock.url)`. +/// +/// This domain should not resolve otherwise. The `.test` top-level domain +/// is reserved and cannot be registered on the open Internet. +const MOCKED_BASE_URL: &str = "http://mocked.clickhouse.test"; + +/// The real base URL where the mocked server is listening. +const REAL_BASE_URL: &str = "http://127.0.0.1"; + /// A mock server for testing. pub struct Mock { - url: String, + mock_url: String, + pub(crate) real_url: String, shared: Arc>, non_exhaustive: bool, server_handle: AbortHandle, @@ -51,7 +64,8 @@ impl Mock { let server_handle = tokio::spawn(server(listener, shared.clone())); Self { - url: format!("http://{addr}"), + mock_url: format!("{MOCKED_BASE_URL}:{}", addr.port()), + real_url: format!("{REAL_BASE_URL}:{}", addr.port()), non_exhaustive: false, server_handle: server_handle.abort_handle(), shared, @@ -62,7 +76,18 @@ impl Mock { /// /// [`Client`]: crate::Client::with_url pub fn url(&self) -> &str { - &self.url + &self.mock_url + } + + pub(crate) fn real_url(&self) -> &str { + &self.real_url + } + + /// Returns `Some` if `url` was a mocked URL and converted to real, `None` if already real. + pub(crate) fn mocked_url_to_real(url: &str) -> Option { + url.strip_prefix(MOCKED_BASE_URL) + // rest = ":{port}" + .map(|rest| format!("{REAL_BASE_URL}{rest}")) } /// Adds a handler to the test server for the next request. diff --git a/tests/it/insert.rs b/tests/it/insert.rs index 68e569d4..4f43e7c4 100644 --- a/tests/it/insert.rs +++ b/tests/it/insert.rs @@ -1,6 +1,7 @@ use crate::{SimpleRow, create_simple_table, fetch_rows, flush_query_log}; use clickhouse::{Row, sql::Identifier}; use serde::{Deserialize, Serialize}; +use std::panic::AssertUnwindSafe; #[tokio::test] async fn keeps_client_options() { @@ -239,3 +240,187 @@ async fn insert_from_cursor() { ); assert_eq!(cursor.next().await.unwrap(), None); } + +#[tokio::test] +async fn cache_row_metadata() { + #[derive(clickhouse::Row, serde::Serialize)] + struct Foo { + bar: i32, + baz: String, + } + + let db_name = test_database_name!(); + let table_name = "foo"; + + let client = crate::_priv::prepare_database(&db_name) + .await + .with_validation(true); + + client + .query("CREATE TABLE foo(bar Int32, baz String) ENGINE = MergeTree PRIMARY KEY(bar)") + .execute() + .await + .unwrap(); + + // Ensure `system.query_log` is fully written + flush_query_log(&client).await; + + let count_query = "SELECT count() FROM system.query_log WHERE query LIKE ? || '%'"; + + let row_insert_metadata_query = + clickhouse::_priv::row_insert_metadata_query(&db_name, table_name); + + println!("row_insert_metadata_query: {row_insert_metadata_query:?}"); + + let initial_count: u64 = client + .query(count_query) + .bind(&row_insert_metadata_query) + .fetch_one() + .await + .unwrap(); + + let mut insert = client.insert::(table_name).await.unwrap(); + + insert + .write(&Foo { + bar: 1, + baz: "Hello, world!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + // Ensure `system.query_log` is fully written + flush_query_log(&client).await; + + let after_insert: u64 = client + .query(count_query) + .bind(&row_insert_metadata_query) + .fetch_one() + .await + .unwrap(); + + // If the database server has not been reset between test runs, `initial_count` will be nonzero. + // + // Instead, of asserting a specific value, we assert that the count has changed. + assert_ne!(after_insert, initial_count); + + let mut insert = client.insert::(table_name).await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + flush_query_log(&client).await; + + let final_count: u64 = client + .query(count_query) + .bind(&row_insert_metadata_query) + .fetch_one() + .await + .unwrap(); + + // Insert metadata is cached, so we should not have queried this table again. + assert_eq!(final_count, after_insert); +} + +#[tokio::test] +async fn clear_cached_metadata() { + #[derive(clickhouse::Row, serde::Serialize)] + struct Foo { + bar: i32, + baz: String, + } + + #[derive( + clickhouse::Row, + serde::Serialize, + serde::Deserialize, + PartialEq, + Eq, + Debug + )] + struct Foo2 { + bar: i32, + } + + let client = prepare_database!().with_validation(true); + + client + .query("CREATE TABLE foo(bar Int32, baz String) ENGINE = MergeTree PRIMARY KEY(bar)") + .execute() + .await + .unwrap(); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 1, + baz: "Hello, world!".to_string(), + }) + .await + .unwrap(); + + insert.end().await.unwrap(); + + client + .query("ALTER TABLE foo DROP COLUMN baz") + .execute() + .await + .unwrap(); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .unwrap(); + + dbg!( + insert + .end() + .await + .expect_err("Insert metadata is invalid; this should error!") + ); + + client.clear_cached_metadata().await; + + let write_invalid = AssertUnwindSafe(async { + let mut insert = client.insert::("foo").await.unwrap(); + + insert + .write(&Foo { + bar: 2, + baz: "Hello, ClickHouse!".to_string(), + }) + .await + .expect_err("`Foo` should no longer be valid for the table"); + }); + + assert_panic_msg!(write_invalid, ["bar", "baz"]); + + let mut insert = client.insert::("foo").await.unwrap(); + + insert.write(&Foo2 { bar: 3 }).await.unwrap(); + + insert.end().await.unwrap(); + + let rows = client + .query("SELECT * FROM foo ORDER BY bar") + .fetch_all::() + .await + .unwrap(); + + assert_eq!(*rows, [Foo2 { bar: 1 }, Foo2 { bar: 3 }]); +} diff --git a/tests/it/main.rs b/tests/it/main.rs index 4a5c38a3..2d2a78ee 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -78,7 +78,7 @@ macro_rules! assert_panic_msg { result.unwrap() ); let panic_msg = *result.unwrap_err().downcast::().unwrap(); - for &msg in $msg_parts { + for msg in $msg_parts { assert!( panic_msg.contains(msg), "panic message:\n{panic_msg}\ndid not contain the expected part:\n{msg}" @@ -89,14 +89,16 @@ macro_rules! assert_panic_msg { macro_rules! prepare_database { () => { - crate::_priv::prepare_database({ + crate::_priv::prepare_database(&test_database_name!()).await + }; +} + +macro_rules! test_database_name { + () => { + crate::_priv::make_db_name({ fn f() {} - fn type_name_of_val(_: T) -> &'static str { - std::any::type_name::() - } - type_name_of_val(f) + std::any::type_name_of_val(&f) }) - .await }; } @@ -168,7 +170,12 @@ where } pub(crate) async fn flush_query_log(client: &Client) { - client.query("SYSTEM FLUSH LOGS").execute().await.unwrap(); + client + .query("SYSTEM FLUSH LOGS") + .with_option("wait_end_of_query", "1") + .execute() + .await + .unwrap(); } pub(crate) async fn execute_statements(client: &Client, statements: &[&str]) { @@ -274,14 +281,13 @@ mod _priv { use super::*; use std::time::SystemTime; - pub(crate) async fn prepare_database(fn_path: &str) -> Client { - let db_name = make_db_name(fn_path); + pub(crate) async fn prepare_database(db_name: &str) -> Client { let client = get_client(); client .query("DROP DATABASE IF EXISTS ?") .with_option("wait_end_of_query", "1") - .bind(Identifier(&db_name)) + .bind(Identifier(db_name)) .execute() .await .unwrap_or_else(|err| panic!("cannot drop db {db_name}, cause: {err}")); @@ -289,7 +295,7 @@ mod _priv { client .query("CREATE DATABASE ?") .with_option("wait_end_of_query", "1") - .bind(Identifier(&db_name)) + .bind(Identifier(db_name)) .execute() .await .unwrap_or_else(|err| panic!("cannot create db {db_name}, cause: {err}")); @@ -301,7 +307,7 @@ mod _priv { // `it::compression::lz4::{{closure}}::f` -> // - "local" env: `chrs__compression__lz4` // - "cloud" env: `chrs__compression__lz4__{unix_millis}` - fn make_db_name(fn_path: &str) -> String { + pub(crate) fn make_db_name(fn_path: &str) -> String { assert!(fn_path.starts_with("it::")); let mut iter = fn_path.split("::").skip(1); let module = iter.next().unwrap(); diff --git a/tests/it/mock.rs b/tests/it/mock.rs index 00866f06..f4ab37c2 100644 --- a/tests/it/mock.rs +++ b/tests/it/mock.rs @@ -25,3 +25,18 @@ async fn provide() { tokio::time::advance(Duration::from_secs(100_000)).await; test_provide().await; } + +#[tokio::test] +async fn client_with_url() { + let mock = test::Mock::new(); + + // Existing usages before `with_mock()` was introduced should not silently break. + let client = Client::default().with_url(mock.url()); + let expected = vec![SimpleRow::new(1, "one"), SimpleRow::new(2, "two")]; + + // FIXME: &expected is not allowed due to new trait bounds + mock.add(test::handlers::provide(expected.clone())); + + let actual = crate::fetch_rows::(&client, "doesn't matter").await; + assert_eq!(actual, expected); +} diff --git a/tests/it/rbwnat_smoke.rs b/tests/it/rbwnat_smoke.rs index 1abbb9c6..77ae3005 100644 --- a/tests/it/rbwnat_smoke.rs +++ b/tests/it/rbwnat_smoke.rs @@ -1,6 +1,6 @@ use crate::decimals::*; use crate::geo_types::{LineString, MultiLineString, MultiPolygon, Point, Polygon, Ring}; -use crate::{SimpleRow, create_simple_table, get_client, insert_and_select}; +use crate::{SimpleRow, create_simple_table, execute_statements, get_client, insert_and_select}; use clickhouse::Row; use clickhouse::sql::Identifier; use fxhash::FxHashMap; @@ -9,6 +9,7 @@ use linked_hash_map::LinkedHashMap; use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use std::collections::HashMap; +use std::panic::AssertUnwindSafe; use std::str::FromStr; #[tokio::test] @@ -1412,3 +1413,263 @@ async fn interval() { } ); } + +// See https://clickhouse.com/docs/sql-reference/statements/create/table#ephemeral +#[tokio::test] +async fn ephemeral_columns() { + let table_name = "test_ephemeral_columns"; + + #[derive(Clone, Debug, Row, Serialize, PartialEq)] + struct DataInsert { + id: u64, + hexed: String, + } + + #[derive(Clone, Debug, Row, Deserialize, PartialEq)] + struct DataSelect { + id: u64, + raw: [u8; 3], + } + + let client = prepare_database!(); + client + .query( + " + CREATE OR REPLACE TABLE ? + ( + id UInt64, + hexed String EPHEMERAL, + raw FixedString(3) DEFAULT unhex(hexed) + ) + ENGINE = MergeTree + ORDER BY id + ", + ) + .bind(Identifier(table_name)) + .execute() + .await + .unwrap(); + + let rows_to_insert = vec![ + DataInsert { + id: 1, + hexed: "666F6F".to_string(), // "foo" in hex + }, + DataInsert { + id: 2, + hexed: "626172".to_string(), // "bar" in hex + }, + ]; + + let mut insert = client.insert::(table_name).await.unwrap(); + for row in rows_to_insert.into_iter() { + insert.write(&row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows = client + .query("SELECT ?fields FROM ? ORDER BY () ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + assert_eq!( + rows, + vec![ + DataSelect { + id: 1, + raw: *b"foo", + }, + DataSelect { + id: 2, + raw: *b"bar", + } + ] + ); +} + +// See https://clickhouse.com/docs/sql-reference/statements/alter/column#materialize-column +#[tokio::test] +async fn materialized_columns() { + let table_name = "test_materialized_columns"; + + #[derive(Clone, Debug, Row, Serialize, Deserialize, PartialEq)] + struct Data { + x: i64, + } + + #[derive(Clone, Debug, Row, Serialize, Deserialize, PartialEq)] + struct DataWithMaterialized { + x: i64, + // MATERIALIZED columns cannot be inserted into + s: String, + } + + let client = prepare_database!(); + execute_statements( + &client, + &[ + &format!( + " + CREATE OR REPLACE TABLE {table_name} (x Int64) + ENGINE = MergeTree ORDER BY () PARTITION BY () + " + ), + &format!("INSERT INTO {table_name} SELECT * FROM system.numbers LIMIT 5"), + &format!("ALTER TABLE {table_name} ADD COLUMN s String MATERIALIZED toString(x)"), + &format!("ALTER TABLE {table_name} MATERIALIZE COLUMN s"), + ], + ) + .await; + + let rows = client + .query("SELECT ?fields FROM ? ORDER BY x ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows = (0..5) + .map(|x| DataWithMaterialized { + x, + s: x.to_string(), + }) + .collect::>(); + assert_eq!(rows, expected_rows); + + let insert_data = AssertUnwindSafe(async { + let _ = client + .insert::(table_name) + .await + .unwrap(); + }); + + assert_panic_msg!( + insert_data, + ["column s is immutable (declared as `MATERIALIZED`)"] + ); + + let rows_to_insert = (5..10).map(|x| Data { x }); + + let mut insert = client.insert::(table_name).await.unwrap(); + for row in rows_to_insert { + insert.write(&row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows_after_insert = client + .query("SELECT ?fields FROM ? ORDER BY x ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows_after_insert = (0..10) + .map(|x| DataWithMaterialized { + x, + s: x.to_string(), + }) + .collect::>(); + + assert_eq!(rows_after_insert, expected_rows_after_insert); +} + +// See https://clickhouse.com/docs/sql-reference/statements/create/table#alias +#[tokio::test] +async fn alias_columns() { + let table_name = "test_alias_columns"; + + #[derive(Clone, Debug, Row, Deserialize, PartialEq)] + struct Data { + id: u64, + size_bytes: i64, + size: String, + } + + #[derive(Clone, Debug, Row, Serialize, PartialEq)] + struct DataInsert { + id: u64, + size_bytes: i64, + } + + let client = prepare_database!(); + execute_statements( + &client, + &[ + &format!( + " + CREATE OR REPLACE TABLE {table_name} + ( + id UInt64, + size_bytes Int64, + size String ALIAS formatReadableSize(size_bytes) + ) + ENGINE = MergeTree + ORDER BY id; + ", + ), + &format!("INSERT INTO {table_name} VALUES (1, 4678899)"), + ], + ) + .await; + + let rows = client + .query("SELECT ?fields FROM ?") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows = vec![Data { + id: 1, + size_bytes: 4678899, + size: "4.46 MiB".to_string(), + }]; + + assert_eq!(rows, expected_rows); + + let rows_to_insert = vec![ + DataInsert { + id: 2, + size_bytes: 123456, + }, + DataInsert { + id: 3, + size_bytes: 987654321, + }, + ]; + + let mut insert = client.insert::(table_name).await.unwrap(); + for row in &rows_to_insert { + insert.write(row).await.unwrap(); + } + insert.end().await.unwrap(); + + let rows_after_insert = client + .query("SELECT ?fields FROM ? ORDER BY id ASC") + .bind(Identifier(table_name)) + .fetch_all::() + .await + .unwrap(); + + let expected_rows_after_insert = vec![ + Data { + id: 1, + size_bytes: 4678899, + size: "4.46 MiB".to_string(), + }, + Data { + id: 2, + size_bytes: 123456, + size: "120.56 KiB".to_string(), + }, + Data { + id: 3, + size_bytes: 987654321, + size: "941.90 MiB".to_string(), + }, + ]; + + assert_eq!(rows_after_insert, expected_rows_after_insert); +} diff --git a/tests/it/rbwnat_validation.rs b/tests/it/rbwnat_validation.rs index b90f87d1..44996eb0 100644 --- a/tests/it/rbwnat_validation.rs +++ b/tests/it/rbwnat_validation.rs @@ -531,7 +531,14 @@ async fn issue_109_1() { let unwind = std::panic::AssertUnwindSafe(async { let _ = client.insert::("issue_109").await.unwrap(); }); - assert_panic_msg!(unwind, &["Data", "4 columns", "3 fields"]); + + assert_panic_msg!( + unwind, + &[ + "the following non-default columns are missing", + "en_id: String" + ] + ); } #[tokio::test]