Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ undocumented_unsafe_blocks = "warn"
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[bench]]
name = "select_nyc_taxi_data"
harness = false
required-features = ["time"]

[[bench]]
name = "select_numbers"
harness = false
Expand Down Expand Up @@ -132,6 +137,7 @@ replace_with = { version = "0.1.7" }

[dev-dependencies]
criterion = "0.5.0"
tracy-client = { version = "0.18.0", features = ["enable"]}
serde = { version = "1.0.106", features = ["derive"] }
tokio = { version = "1.0.1", features = ["full", "test-util"] }
hyper = { version = "1.1", features = ["server"] }
Expand Down
12 changes: 10 additions & 2 deletions benches/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ All cases are run with `cargo bench --bench <case>`.

## With a mocked server

These benchmarks are run against a mocked server, which is a simple HTTP server that responds with a fixed response. This is useful to measure the overhead of the client itself:
These benchmarks are run against a mocked server, which is a simple HTTP server that responds with a fixed response.
This is useful to measure the overhead of the client itself:

* `select` checks throughput of `Client::query()`.
* `insert` checks throughput of `Client::insert()` and `Client::inserter()` (if the `inserter` features is enabled).

### How to collect perf data

The crate's code runs on the thread with the name `testee`:

```bash
cargo bench --bench <name> &
perf record -p `ps -AT | grep testee | awk '{print $2}'` --call-graph dwarf,65528 --freq 5000 -g -- sleep 5
Expand All @@ -22,13 +25,18 @@ Then upload the `perf.script` file to [Firefox Profiler](https://profiler.firefo
## With a running ClickHouse server

These benchmarks are run against a real ClickHouse server, so it must be started:

```bash
docker compose up -d
cargo bench --bench <case>
```

Cases:
* `select_numbers` measures time of running a big SELECT query to the `system.numbers_mt` table.

* [select_numbers.rs](select_numbers.rs) measures time of running a big SELECT query to the `system.numbers_mt` table.
* [select_nyc_taxi_data.rs](select_nyc_taxi_data.rs) measures time of running a fairly large SELECT query (approximately
3 million records) to the `nyc_taxi_data` table
using [the sample dataset](https://clickhouse.com/docs/getting-started/example-datasets/nyc-taxi#create-the-table-trips).

### How to collect perf data

Expand Down
80 changes: 80 additions & 0 deletions benches/select_nyc_taxi_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#![cfg(feature = "time")]

use clickhouse::{Client, Compression, Row};
use criterion::black_box;
use serde::Deserialize;
use serde_repr::Deserialize_repr;
use time::OffsetDateTime;

#[derive(Debug, Clone, Deserialize_repr)]
#[repr(i8)]
pub enum PaymentType {
CSH = 1,
CRE = 2,
NOC = 3,
DIS = 4,
UNK = 5,
}

#[derive(Debug, Clone, Row, Deserialize)]
#[allow(dead_code)]
pub struct TripSmall {
trip_id: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
pickup_datetime: OffsetDateTime,
#[serde(with = "clickhouse::serde::time::datetime")]
dropoff_datetime: OffsetDateTime,
pickup_longitude: Option<f64>,
pickup_latitude: Option<f64>,
dropoff_longitude: Option<f64>,
dropoff_latitude: Option<f64>,
passenger_count: u8,
trip_distance: f32,
fare_amount: f32,
extra: f32,
tip_amount: f32,
tolls_amount: f32,
total_amount: f32,
payment_type: PaymentType,
pickup_ntaname: String,
dropoff_ntaname: String,
}

async fn bench(name: &str, compression: Compression) {
let start = std::time::Instant::now();
let (last_trip_id, dec_mbytes, rec_mbytes) = do_bench(compression).await;
assert_eq!(last_trip_id, 3630387815532582);
let elapsed = start.elapsed();
let throughput = dec_mbytes / elapsed.as_secs_f64();
println!("{name:>8} {elapsed:>7.3?} {throughput:>4.0} MiB/s {rec_mbytes:>4.0} MiB");
}

async fn do_bench(compression: Compression) -> (u64, f64, f64) {
let client = Client::default()
.with_compression(compression)
.with_url("http://localhost:8123");

let mut cursor = client
.query("SELECT * FROM nyc_taxi.trips_small ORDER BY trip_id DESC")
.fetch::<TripSmall>()
.unwrap();

let mut sum = 0;
while let Some(row) = cursor.next().await.unwrap() {
sum += row.trip_id as u64;
black_box(&row);
}

let dec_bytes = cursor.decoded_bytes();
let dec_mbytes = dec_bytes as f64 / 1024.0 / 1024.0;
let recv_bytes = cursor.received_bytes();
let recv_mbytes = recv_bytes as f64 / 1024.0 / 1024.0;
(sum, dec_mbytes, recv_mbytes)
}

#[tokio::main]
async fn main() {
println!("compress elapsed throughput received");
bench("none", Compression::None).await;
bench("lz4", Compression::Lz4).await;
}