Skip to content
This repository was archived by the owner on Aug 30, 2022. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 308 additions & 10 deletions rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
# internals
"benches",
"examples",
"e2e",
]

[workspace.metadata]
Expand Down
2 changes: 2 additions & 0 deletions rust/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/*.log
**/*.pem
36 changes: 36 additions & 0 deletions rust/e2e/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "e2e"
version = "0.0.0"
authors = ["Xayn Engineering <engineering@xaynet.dev>"]
edition = "2018"
description = "End-to-end tests for Xaynet"
readme = "../../README.md"
homepage = "https://xaynet.dev/"
repository = "https://github.com/xaynetwork/xaynet/"
keywords = ["xaynet", "e2e"]
categories = ["testing"]
license-file = "../../LICENSE"
publish = false

[dependencies]
anyhow = "1.0.40"
async-trait = "0.1.48"
chrono = { version = "0.4.19" }
config = "0.11.0"
console = "0.14.1"
futures = "0.3.13"
indicatif = "0.15.0"
influxdb = { version = "0.4.0", default-features = false, features = ["h1-client", "use-serde"] }
kube = "0.51.0"
k8s-openapi = { version = "0.11.0", default-features = false, features = ["v1_19"] }
reqwest = { version = "0.11.2", default-features = false, features = ["json", "gzip", "stream"] }
serde = { version = "1.0.125", features = ["derive"] }
serde_json = { version = "1.0.64" }
serde_yaml = "0.8.17"
tokio = { version = "1.4.0", features = ["full"] }
toml = "0.5.8"
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
xaynet-core = { path = "../xaynet-core" }
xaynet-sdk = { path = "../xaynet-sdk", features = ["reqwest-client"] }
xaynet-server = { path = "../xaynet-server", features = ["model-persistence"] }
78 changes: 78 additions & 0 deletions rust/e2e/src/bin/test_case_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use e2e::{
test_client::builder::{TestClientBuilder, TestClientBuilderSettings},
test_env::{utils, TestEnvironment, TestEnvironmentSettings},
};
use tokio::{
signal,
time::{timeout, Duration},
};
use tracing::info;
use xaynet_server::state_machine::phases::PhaseName;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let env_settings = TestEnvironmentSettings::from_file("src/bin/test_case_1")?;
let env = TestEnvironment::new(env_settings.clone()).await?;

tokio::select! {
res = timeout(Duration::from_secs(6000), run(env)) => {
res?
}
_ = signal::ctrl_c() => { Ok(()) }
}
}

async fn run(mut env: TestEnvironment) -> anyhow::Result<()> {
let k8s = env.get_k8s_client().await?;
k8s.deploy_with_image_and_config(env.get_env_settings().coordinator.config)
.await?;
let handle = k8s
.save_coordinator_logs("src/bin/test_case_1/coordinator.log")
.await?;

let _pfi_guard = k8s.port_forward_influxdb()?;
let _pfc_guard = if env.get_env_settings().api_client.certificates.is_none() {
Some(k8s.port_forward_coordinator().await?)
} else {
None
};

let mut api_client = env.get_api_client()?;
let mut influx_client = env.get_influx_client();

info!("wait until clients are ready");
let _ = tokio::join!(
utils::wait_until_client_is_ready(&mut api_client),
utils::wait_until_client_is_ready(&mut influx_client),
);
utils::wait_until_phase(&influx_client, PhaseName::Sum).await;

////////////////////////////////////////////////////////////////////////////////////////////////

let coordinator_settings = env.get_coordinator_settings()?;
let test_client_builder_settings = TestClientBuilderSettings::from(coordinator_settings);

let mut test_client_builder = TestClientBuilder::new(test_client_builder_settings, api_client);

////////////////////////////////////////////////////////////////////////////////////////////////

for round in 0..10 {
info!("Round: {}", round);

let mut runner = test_client_builder.build_clients().await?;
info!("run sum clients...");
runner.run_sum_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Update).await;
info!("run update clients...");
runner.run_update_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum2).await;
info!("run sum2 clients...");
runner.run_sum2_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum).await;
}

////////////////////////////////////////////////////////////////////////////////////////////////

timeout(Duration::from_secs(10), handle).await???;
Ok(())
}
31 changes: 31 additions & 0 deletions rust/e2e/src/bin/test_case_1/Env.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
filter = "test_case=debug,e2e=debug,xaynet=info"

[k8s]
namespace = "xaynet"
coordinator_pod_label = "app=coordinator"
coordinator_image = "xaynetwork/xaynet:development"
influxdb_pod_name = "influxdb-0"
redis_pod_name = "redis-master-0"
s3_pod_label = "minio"

[coordinator]
config = "src/bin/test_case_1/config.toml"

[influx]
url = "http://localhost:8086"
db = "metrics"

[redis]
url = "redis://localhost/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://localhost:9000"]

[api_client]
address = "http://localhost:8081"
# tls
# address = "https://dev-coordinator.xayn.com"
# certificates = [ "src/bin/test_case_1/dev-coordinator-xayn-com.pem" ]
# identity =
45 changes: 45 additions & 0 deletions rust/e2e/src/bin/test_case_1/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[log]
filter = "xaynet=debug,http=warn,info"

[api]
bind_address = "0.0.0.0:8081"
tls_certificate = "/app/ssl/tls.pem"
tls_key = "/app/ssl/tls.key"

[pet.sum]
prob = 0.5
count = { min = 10, max = 100 }
time = { min = 5, max = 3600 }

[pet.update]
prob = 0.9
count = { min = 3, max = 10000 }
time = { min = 10, max = 3600 }

[pet.sum2]
count = { min = 5, max = 100 }
time = { min = 5, max = 3600 }

[mask]
group_type = "Prime"
data_type = "F32"
bound_type = "B0"
model_type = "M3"

[model]
length = 1

[metrics.influxdb]
url = "http://influxdb:8086"
db = "metrics"

[redis]
url = "redis://127.0.0.1/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://minio:9000"]

[restore]
enable = false
3 changes: 3 additions & 0 deletions rust/e2e/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod test_client;
pub mod test_env;
pub mod utils;
118 changes: 118 additions & 0 deletions rust/e2e/src/test_client/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::sync::Arc;

use anyhow::bail;
use xaynet_core::{
crypto::SigningKeyPair,
mask::{FromPrimitives, Model},
};
use xaynet_sdk::{client::Client as ApiClient, XaynetClient};
use xaynet_server::settings::Settings as CoordinatorSettings;

use super::{
runner::ClientRunner,
utils::{default_sum_client, default_update_client, generate_client, ClientType, LocalModel},
};
use crate::utils::concurrent_futures::ConcurrentFutures;

pub struct TestClientBuilderSettings {
number_of_sum: u64,
number_of_update: u64,
number_of_sum2: u64,
model_length: usize,
}

impl TestClientBuilderSettings {
pub fn new(
number_of_sum: u64,
number_of_update: u64,
number_of_sum2: u64,
model_length: usize,
) -> Self {
Self {
number_of_sum,
number_of_update,
number_of_sum2,
model_length,
}
}
}

impl From<CoordinatorSettings> for TestClientBuilderSettings {
fn from(settings: CoordinatorSettings) -> Self {
Self {
number_of_sum: settings.pet.sum.count.min,
number_of_update: settings.pet.update.count.min,
number_of_sum2: settings.pet.sum2.count.min,
model_length: settings.model.length,
}
}
}

pub struct TestClientBuilder {
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
model: Arc<Model>,
}

impl TestClientBuilder {
pub fn new(
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
) -> Self {
let model = Model::from_primitives(vec![1; settings.model_length].into_iter()).unwrap();
Self {
api_client,
settings,
model: Arc::new(model),
}
}

pub async fn build_client<F, R>(
&mut self,
r#type: &ClientType,
func: F,
) -> anyhow::Result<ConcurrentFutures<R>>
where
F: Fn(SigningKeyPair, ApiClient<reqwest::Client>, LocalModel) -> R,
R: Send + 'static + futures::Future,
<R as futures::Future>::Output: Send + 'static,
{
let round_params = self.api_client.get_round_params().await?;
let mut clients = ConcurrentFutures::<R>::new(100);

let number_of_clients = match r#type {
ClientType::Sum => self.settings.number_of_sum,
ClientType::Update => self.settings.number_of_update,
_ => bail!("client type is not supported"),
};

for _ in 0..number_of_clients {
let key_pair = generate_client(r#type, &round_params);
let client = func(
key_pair,
self.api_client.clone(),
LocalModel(self.model.clone()),
);

clients.push(client);
}

Ok(clients)
}

pub async fn build_clients(&mut self) -> anyhow::Result<ClientRunner> {
let sum_clients = self
.build_client(&ClientType::Sum, default_sum_client)
.await?;

let update_clients = self
.build_client(&ClientType::Update, default_update_client)
.await?;

Ok(ClientRunner::new(
sum_clients,
update_clients,
self.settings.number_of_sum2,
))
}
}
3 changes: 3 additions & 0 deletions rust/e2e/src/test_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod builder;
pub mod runner;
pub mod utils;
Loading