Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ jobs:
- run: rustup override set nightly
- run: rustup show active-toolchain -v
# Serde 1.0.227 fails to build with `--cfg docsrs`, only pass it to our own packages
- run: cargo rustdoc -p clickhouse-derive -- -D warnings --cfg docsrs
- run: cargo rustdoc -p clickhouse-types -- -D warnings --cfg docsrs
- run: cargo rustdoc -p clickhouse -- -D warnings --cfg docsrs
- run: cargo rustdoc -p clickhouse-derive --all-features -- -D warnings --cfg docsrs
- run: cargo rustdoc -p clickhouse-types --all-features -- -D warnings --cfg docsrs
- run: cargo rustdoc -p clickhouse --all-features -- -D warnings --cfg docsrs

test:
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
name: clickhouse-rs
services:
clickhouse:
image: 'clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}'
# Note: use of a fully qualified url makes Podman happy without need for further configuration.
image: 'docker.io/clickhouse/clickhouse-server:${CLICKHOUSE_VERSION-latest-alpine}'
container_name: 'clickhouse-rs-clickhouse-server'
environment:
CLICKHOUSE_SKIP_USER_SETUP: 1
Expand Down
15 changes: 12 additions & 3 deletions src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use bytes::{Bytes, BytesMut};
use clickhouse_types::put_rbwnat_columns_header;
use hyper::{self, Request};
use replace_with::replace_with_or_abort;
use std::sync::Arc;
use std::{future::Future, marker::PhantomData, mem, panic, pin::Pin, time::Duration};
use tokio::{
task::JoinHandle,
Expand All @@ -34,11 +33,21 @@ const_assert!(BUFFER_SIZE.is_power_of_two()); // to use the whole buffer's capac
/// Otherwise, the whole `INSERT` will be aborted.
///
/// Rows are being sent progressively to spread network load.
///
/// # Note: Metadata is Cached
/// If [validation is enabled][Client::with_validation],
/// this helper will query the metadata for the target table to learn the column names and types.
///
/// To avoid querying this metadata every time, it is cached within the [`Client`].
///
/// Any concurrent changes to the table schema may cause insert failures if the metadata
/// is no longer correct. For correct functioning, call [`Client::clear_cached_metadata()`]
/// after any changes to the current database schema.
#[must_use]
pub struct Insert<T> {
state: InsertState,
buffer: BytesMut,
row_metadata: Option<Arc<RowMetadata>>,
row_metadata: Option<RowMetadata>,
#[cfg(feature = "lz4")]
compression: Compression,
send_timeout: Option<Duration>,
Expand Down Expand Up @@ -121,7 +130,7 @@ macro_rules! timeout {
}

impl<T> Insert<T> {
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<Arc<RowMetadata>>) -> Self
pub(crate) fn new(client: &Client, table: &str, row_metadata: Option<RowMetadata>) -> Self
where
T: Row,
{
Expand Down
196 changes: 125 additions & 71 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ pub use self::{
row::{Row, RowOwned, RowRead, RowWrite},
};
use self::{error::Result, http_client::HttpClient};
use crate::row_metadata::RowMetadata;
use crate::sql::Identifier;
use crate::row_metadata::{AccessType, ColumnDefaultKind, InsertMetadata, RowMetadata};

#[doc = include_str!("row_derive.md")]
pub use clickhouse_derive::Row;
use clickhouse_types::parse_rbwnat_columns_header;
use clickhouse_types::{Column, DataTypeNode};

use crate::_priv::row_insert_metadata_query;
use std::{collections::HashMap, fmt::Display, sync::Arc};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -61,7 +61,7 @@ pub struct Client {
headers: HashMap<String, String>,
products_info: Vec<ProductInfo>,
validation: bool,
row_metadata_cache: Arc<RowMetadataCache>,
insert_metadata_cache: Arc<InsertMetadataCache>,

#[cfg(feature = "test-util")]
mocked: bool,
Expand Down Expand Up @@ -107,13 +107,8 @@ impl Default for Client {

/// Cache for [`RowMetadata`] to avoid allocating it for the same struct more than once
/// during the application lifecycle. Key: fully qualified table name (e.g. `database.table`).
pub(crate) struct RowMetadataCache(RwLock<HashMap<String, Arc<RowMetadata>>>);

impl Default for RowMetadataCache {
fn default() -> Self {
RowMetadataCache(RwLock::new(HashMap::default()))
}
}
#[derive(Default)]
pub(crate) struct InsertMetadataCache(RwLock<HashMap<String, Arc<InsertMetadata>>>);

impl Client {
/// Creates a new client with a specified underlying HTTP client.
Expand All @@ -130,33 +125,55 @@ impl Client {
headers: HashMap::new(),
products_info: Vec::default(),
validation: true,
row_metadata_cache: Arc::new(RowMetadataCache::default()),
insert_metadata_cache: Arc::new(InsertMetadataCache::default()),
#[cfg(feature = "test-util")]
mocked: false,
}
}

/// Specifies ClickHouse's url. Should point to HTTP endpoint.
///
/// Automatically [clears the metadata cache][Self::clear_cached_metadata]
/// for this instance only.
///
/// # Examples
/// ```
/// # use clickhouse::Client;
/// let client = Client::default().with_url("http://localhost:8123");
/// ```
pub fn with_url(mut self, url: impl Into<String>) -> Self {
self.url = url.into();

// `with_mock()` didn't exist previously, so to not break existing usages,
// we need to be able to detect a mocked server using nothing but the URL.
#[cfg(feature = "test-util")]
if let Some(url) = test::Mock::mocked_url_to_real(&self.url) {
self.url = url;
self.mocked = true;
}

// Assume our cached metadata is invalid.
self.insert_metadata_cache = Default::default();

self
}

/// Specifies a database name.
///
/// Automatically [clears the metadata cache][Self::clear_cached_metadata]
/// for this instance only.
///
/// # Examples
/// ```
/// # use clickhouse::Client;
/// let client = Client::default().with_database("test");
/// ```
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.database = Some(database.into());

// Assume our cached metadata is invalid.
self.insert_metadata_cache = Default::default();

self
}

Expand Down Expand Up @@ -347,8 +364,12 @@ impl Client {
/// If `T` has unnamed fields, e.g. tuples.
pub async fn insert<T: Row>(&self, table: &str) -> Result<insert::Insert<T>> {
if self.get_validation() {
let metadata = self.get_row_metadata_for_insert::<T>(table).await?;
return Ok(insert::Insert::new(self, table, Some(metadata)));
let metadata = self.get_insert_metadata(table).await?;
return Ok(insert::Insert::new(
self,
table,
Some(metadata.to_row::<T>()),
));
}
Ok(insert::Insert::new(self, table, None))
}
Expand Down Expand Up @@ -382,11 +403,36 @@ impl Client {
/// in your specific use case. Additionally, writing smoke tests to ensure that
/// the row types match the ClickHouse schema is highly recommended,
/// if you plan to disable validation in your application.
///
/// # Note: Mocking
/// When using [`test::Mock`] with the `test-util` feature, validation is forced off.
///
/// This applies either when using [`Client::with_mock()`], or [`Client::with_url()`]
/// with a URL from [`test::Mock::url()`].
///
/// As of writing, the mocking facilities are unable to generate the `RowBinaryWithNamesAndTypes`
/// header required for validation to function.
pub fn with_validation(mut self, enabled: bool) -> Self {
self.validation = enabled;
self
}

/// Clear table metadata that was previously received and cached.
///
/// [`Insert`][crate::insert::Insert] uses cached metadata when sending data with validation.
/// If the table schema changes, this metadata needs to re-fetched.
///
/// This method clears the metadata cache, causing future insert queries to re-fetch metadata.
/// This applies to all cloned instances of this `Client` (using the same URL and database)
/// as well.
///
/// This may need to wait to acquire a lock if a query is concurrently writing into the cache.
///
/// Cancel-safe.
pub async fn clear_cached_metadata(&self) {
self.insert_metadata_cache.0.write().await.clear();
}

/// Used internally to check if the validation mode is enabled,
/// as it takes into account the `test-util` feature flag.
#[inline]
Expand All @@ -413,43 +459,58 @@ impl Client {
/// which is pointless in that kind of tests.
#[cfg(feature = "test-util")]
pub fn with_mock(mut self, mock: &test::Mock) -> Self {
self.url = mock.url().to_string();
self.url = mock.real_url().to_string();
self.mocked = true;
self
}

async fn get_row_metadata_for_insert<T: Row>(
&self,
table_name: &str,
) -> Result<Arc<RowMetadata>> {
let read_lock = self.row_metadata_cache.0.read().await;
match read_lock.get(table_name) {
Some(metadata) => Ok(metadata.clone()),
None => {
drop(read_lock);
// TODO: should it be moved to a cold function?
let mut write_lock = self.row_metadata_cache.0.write().await;
let db = match self.database {
Some(ref db) => db,
None => "default",
};
let mut bytes_cursor = self
.query("SELECT * FROM ? LIMIT 0")
.bind(Identifier(table_name))
// don't allow to override the client database set in the client instance
// with a `.with_option("database", "some_other_db")` call on the app side
.with_option("database", db)
.fetch_bytes("RowBinaryWithNamesAndTypes")?;
let mut buffer = Vec::<u8>::new();
while let Some(chunk) = bytes_cursor.next().await? {
buffer.extend_from_slice(&chunk);
}
let columns = parse_rbwnat_columns_header(&mut buffer.as_slice())?;
let metadata = Arc::new(RowMetadata::new_for_insert::<T>(columns));
write_lock.insert(table_name.to_string(), metadata.clone());
Ok(metadata)
async fn get_insert_metadata(&self, table_name: &str) -> Result<Arc<InsertMetadata>> {
{
let read_lock = self.insert_metadata_cache.0.read().await;

// FIXME: `table_name` is not necessarily fully qualified here
if let Some(metadata) = read_lock.get(table_name) {
return Ok(metadata.clone());
}
}

// TODO: should it be moved to a cold function?
let mut write_lock = self.insert_metadata_cache.0.write().await;
let db = match self.database {
Some(ref db) => db,
None => "default",
};

let mut columns_cursor = self
.query(&row_insert_metadata_query(db, table_name))
.fetch::<(String, String, String)>()?;

let mut columns = Vec::new();
let mut column_default_kinds = Vec::new();
let mut column_lookup = HashMap::new();

while let Some((name, type_, default_kind)) = columns_cursor.next().await? {
let data_type = DataTypeNode::new(&type_)?;
let default_kind = default_kind.parse::<ColumnDefaultKind>()?;

column_lookup.insert(name.clone(), columns.len());

columns.push(Column { name, data_type });

column_default_kinds.push(default_kind);
}

let metadata = Arc::new(InsertMetadata {
row_metadata: RowMetadata {
columns,
access_type: AccessType::WithSeqAccess, // ignored on insert
},
column_default_kinds,
column_lookup,
});

write_lock.insert(table_name.to_string(), metadata.clone());
Ok(metadata)
}
}

Expand All @@ -463,6 +524,25 @@ pub mod _priv {
pub fn lz4_compress(uncompressed: &[u8]) -> super::Result<bytes::Bytes> {
crate::compression::lz4::compress(uncompressed)
}

// Also needed by `it::insert::cache_row_metadata()`
pub fn row_insert_metadata_query(db: &str, table: &str) -> String {
let mut out = "SELECT \
name, \
type, \
default_kind \
FROM system.columns \
WHERE database = "
.to_string();

crate::sql::escape::string(db, &mut out).unwrap();

out.push_str(" AND table = ");

crate::sql::escape::string(table, &mut out).unwrap();

out
}
}

#[cfg(test)]
Expand Down Expand Up @@ -643,32 +723,6 @@ mod client_tests {
);
}

#[tokio::test]
async fn cache_row_metadata() {
let client = Client::default()
.with_url("http://localhost:8123")
.with_database("system");

let metadata = client
.get_row_metadata_for_insert::<SystemRolesRow>("roles")
.await
.unwrap();

assert_eq!(metadata.columns, SystemRolesRow::columns());
assert_eq!(metadata.access_type, AccessType::WithSeqAccess);

// we can now use a dummy client, cause the metadata is cached,
// and no calls to the database will be made
client
.with_url("whatever")
.get_row_metadata_for_insert::<SystemRolesRow>("roles")
.await
.unwrap();

assert_eq!(metadata.columns, SystemRolesRow::columns());
assert_eq!(metadata.access_type, AccessType::WithSeqAccess);
}

#[test]
fn it_does_follow_previous_configuration() {
let client = Client::default().with_option("async_insert", "1");
Expand Down
Loading