diff --git a/Cargo.lock b/Cargo.lock index 973a9f06..1783d424 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,7 +162,7 @@ dependencies = [ "nom", "num-traits", "rusticata-macros", - "thiserror", + "thiserror 1.0.69", "time", ] @@ -449,7 +449,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 1.0.69", "tokio", "tokio-util", "url", @@ -752,7 +752,7 @@ dependencies = [ "serde", "serde_json", "specifications 3.0.0", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -772,7 +772,7 @@ dependencies = [ "serde", "serde_json", "specifications 3.0.0 (git+https://github.com/epi-project/brane)", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -1410,7 +1410,7 @@ dependencies = [ "console", "shell-words", "tempfile", - "thiserror", + "thiserror 1.0.69", "zeroize", ] @@ -1939,7 +1939,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2ebc8013b4426d5b81a4364c419a95ed0b404af2b82e2457de52d9348f0e474" dependencies = [ "combine", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -2068,15 +2068,6 @@ dependencies = [ "http 0.2.12", ] -[[package]] -name = "heck" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "heck" version = "0.4.1" @@ -2107,12 +2098,6 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" -[[package]] -name = "histogram" -version = "0.6.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669" - [[package]] name = "http" version = "0.2.12" @@ -2624,6 +2609,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -2694,7 +2688,7 @@ dependencies = [ "juniper", "serde", "serde_json", - "thiserror", + "thiserror 1.0.69", "tokio", "warp", ] @@ -3040,27 +3034,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_enum" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96667db765a921f7b295ffee8b60472b686a51d4f21c2ee4ffdb94c7013b65a6" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.87", -] - [[package]] name = "number_prefix" version = "0.4.0" @@ -3317,16 +3290,6 @@ dependencies = [ "serde", ] -[[package]] -name = "proc-macro-crate" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919" -dependencies = [ - "once_cell", - "toml_edit 0.19.15", -] - [[package]] name = "proc-macro2" version = "1.0.86" @@ -3353,7 +3316,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.87", @@ -3366,7 +3329,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72c71c0c79b9701efe4e1e4b563b2016dd4ee789eb99badcb09d61ac4b92e4a2" dependencies = [ "libc", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -3451,11 +3414,11 @@ dependencies = [ [[package]] name = "rand_pcg" -version = "0.3.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59cad018caf63deb318e5a4586d99a24424a364f40f1e5778c29aca23f4fc73e" +checksum = "b48ac3f7ffaab7fac4d2376632268aa5f89abdb55f7ebf8f4d11fffccb2320f7" dependencies = [ - "rand_core 0.6.4", + "rand_core 0.9.1", ] [[package]] @@ -3521,7 +3484,7 @@ checksum = "bd283d9651eeda4b2a83a43c1c91b266c40fd76ecd39a50a8c630ae69dc72891" dependencies = [ "getrandom 0.2.15", "libredox", - "thiserror", + "thiserror 1.0.69", ] [[package]] @@ -3862,9 +3825,9 @@ dependencies = [ [[package]] name = "scylla" -version = "0.12.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03d2db76aa23f55d2ece5354e1a3778633098a3d1ea76153f494d71e92cd02d8" +checksum = "d6f7075a5b5be59fd767b83567bf0c02e9c29626f47d8c4493946b2d6076eb17" dependencies = [ "arc-swap", "async-trait", @@ -3873,20 +3836,17 @@ dependencies = [ "chrono", "dashmap", "futures", - "histogram", - "itertools", + "hashbrown 0.14.5", + "itertools 0.14.0", + "lazy_static", "lz4_flex", - "num_enum", - "rand 0.8.5", + "rand 0.9.0", "rand_pcg", "scylla-cql", - "scylla-macros", "smallvec", "snap", "socket2", - "strum 0.23.0", - "strum_macros 0.23.1", - "thiserror", + "thiserror 2.0.12", "tokio", "tracing", "uuid", @@ -3894,27 +3854,30 @@ dependencies = [ [[package]] name = "scylla-cql" -version = "0.1.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345626c0dd5d9624c413daaba854685bba6a65cff4eb5ea0fb0366df16901f67" +checksum = "85f1decfbb5ff3eb8da7883937c9bca318ccfa220d50bd5a2cdb573fb29bc19b" dependencies = [ "async-trait", "byteorder", "bytes", + "chrono", + "itertools 0.14.0", "lz4_flex", - "num_enum", "scylla-macros", "snap", - "thiserror", + "stable_deref_trait", + "thiserror 2.0.12", "tokio", "uuid", + "yoke", ] [[package]] name = "scylla-macros" -version = "0.4.0" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb6085ff9c3fd7e5163826901d39164ab86f11bdca16b2f766a00c528ff9cef9" +checksum = "6c72a4a475ecca0ab180b995f63e079763baaa453ec1a84f6811ebd8d8fe18e8" dependencies = [ "darling", "proc-macro2", @@ -4159,7 +4122,7 @@ checksum = "adc4e5204eb1910f40f9cfa375f6f05b68c3abac4b6fd879c8ff5e7ae8a0a085" dependencies = [ "num-bigint", "num-traits", - "thiserror", + "thiserror 1.0.69", "time", ] @@ -4218,13 +4181,13 @@ dependencies = [ "env_logger", "futures", "human-panic 1.2.3", - "itertools", + "itertools 0.11.0", "libc", "log", "nix 0.27.1", "num-derive", "num-traits", - "thiserror", + "thiserror 1.0.69", "tokio", "url", "windows", @@ -4357,12 +4320,6 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "strum" -version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae14b91c7d11c9a851d3fbc80a963198998c2a64eec840477fa92d8ce9b70bb" - [[package]] name = "strum" version = "0.24.1" @@ -4387,19 +4344,6 @@ dependencies = [ "strum_macros 0.27.1", ] -[[package]] -name = "strum_macros" -version = "0.23.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bb0dc7ee9c15cea6199cde9a127fa16a4c5819af85395457ad72d68edc85a38" -dependencies = [ - "heck 0.3.3", - "proc-macro2", - "quote", - "rustversion", - "syn 1.0.109", -] - [[package]] name = "strum_macros" version = "0.24.3" @@ -4574,7 +4518,16 @@ version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.69", +] + +[[package]] +name = "thiserror" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" +dependencies = [ + "thiserror-impl 2.0.12", ] [[package]] @@ -4588,6 +4541,17 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "thiserror-impl" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "time" version = "0.3.39" @@ -4759,7 +4723,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.22.16", + "toml_edit", ] [[package]] @@ -4771,17 +4735,6 @@ dependencies = [ "serde", ] -[[package]] -name = "toml_edit" -version = "0.19.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" -dependencies = [ - "indexmap 2.2.6", - "toml_datetime", - "winnow 0.5.40", -] - [[package]] name = "toml_edit" version = "0.22.16" @@ -4792,7 +4745,7 @@ dependencies = [ "serde", "serde_spanned", "toml_datetime", - "winnow 0.6.15", + "winnow", ] [[package]] @@ -4911,7 +4864,7 @@ dependencies = [ "log", "rand 0.8.5", "sha1", - "thiserror", + "thiserror 1.0.69", "url", "utf-8", ] @@ -5412,15 +5365,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" -[[package]] -name = "winnow" -version = "0.5.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f593a95398737aeed53e489c785df13f3618e41dbcd6718c6addbf1395aa6876" -dependencies = [ - "memchr", -] - [[package]] name = "winnow" version = "0.6.15" @@ -5501,7 +5445,7 @@ dependencies = [ "nom", "oid-registry", "rusticata-macros", - "thiserror", + "thiserror 1.0.69", "time", ] diff --git a/brane-api/Cargo.toml b/brane-api/Cargo.toml index a845e5ac..e0d7dfec 100644 --- a/brane-api/Cargo.toml +++ b/brane-api/Cargo.toml @@ -25,7 +25,7 @@ prost = "0.12.0" rand = "0.8.5" # rdkafka = { version = "0.31", features = ["cmake-build"] } reqwest = { version = "0.11.27", features = ["rustls-tls-manual-roots"] } -scylla = "0.12.0" +scylla = "1.0.0" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.120" serde_yaml = { version = "0.0.10", package = "serde_yml" } diff --git a/brane-api/Cargo.toml.orig b/brane-api/Cargo.toml.orig new file mode 100644 index 00000000..b80863ab --- /dev/null +++ b/brane-api/Cargo.toml.orig @@ -0,0 +1,55 @@ +[package] +name = "brane-api" +rust-version = "1.74" +edition = "2021" +version.workspace = true +repository.workspace = true +license.workspace = true +authors.workspace = true + +[dependencies] +async-compression = { version = "0.3.15", features = ["tokio","gzip"] } +bytes = "1.2.0" +chrono = "0.4.35" +clap = { version = "4.5.6", features = ["derive","env"] } +dotenvy = "0.15" +enum-debug.workspace = true +env_logger = "0.10.0" +error-trace.workspace = true +futures = "0.3.24" +juniper = { version = "0.16.1", features = ["chrono"] } +juniper_warp = "0.8.0" +# k8s-openapi = { version = "0.14", default-features = false, features = ["v1_23"] } +log = "0.4.22" +prost = "0.12.0" +rand = "0.8.5" +# rdkafka = { version = "0.31", features = ["cmake-build"] } +<<<<<<< Updated upstream +reqwest = { version = "0.11.27", features = ["rustls-tls-manual-roots"] } +scylla = "0.12.0" +||||||| Stash base +reqwest = { version = "0.12.0", features = ["rustls-tls-manual-roots"] } +scylla = "0.12.0" +======= +reqwest = { version = "0.12.0", features = ["rustls-tls-manual-roots"] } +scylla = "1.0.0" +>>>>>>> Stashed changes +serde = { version = "1.0.204", features = ["derive"] } +serde_json = "1.0.120" +serde_yaml = { version = "0.0.10", package = "serde_yml" } +tempfile = "3.10.1" +time = "0.3.16" +tokio = { version = "1.38.0", default-features = false, features = ["macros", "rt", "signal"] } +tokio-stream = "0.1.6" +tokio-tar = "0.3.0" +tokio-util = { version = "0.7.1", features = ["codec"] } +uuid = { version = "1.7.0", features = ["serde", "v4"] } +warp = "0.3.2" + +brane-cfg = { path = "../brane-cfg" } +brane-prx = { path = "../brane-prx" } +brane-shr = { path = "../brane-shr" } +specifications = { path = "../specifications" } + +[lints] +workspace = true diff --git a/brane-api/src/errors.rs b/brane-api/src/errors.rs index f3e3de8e..45eaf3ca 100644 --- a/brane-api/src/errors.rs +++ b/brane-api/src/errors.rs @@ -20,7 +20,7 @@ use brane_cfg::node::NodeKind; use brane_shr::formatters::PrettyListFormatter; use enum_debug::EnumDebug as _; use reqwest::StatusCode; -use scylla::transport::errors::NewSessionError; +use scylla::errors::NewSessionError; use specifications::address::Address; use specifications::version::Version; @@ -165,20 +165,20 @@ pub enum PackageError { MissingDigest { name: String }, /// Failed to define the `brane.package` type in the Scylla database. - PackageTypeDefineError { err: scylla::transport::errors::QueryError }, + PackageTypeDefineError { err: scylla::errors::ExecutionError }, /// Failed to define the package table in the Scylla database. - PackageTableDefineError { err: scylla::transport::errors::QueryError }, + PackageTableDefineError { err: scylla::errors::ExecutionError }, /// Failed to insert a new package in the database. - PackageInsertError { name: String, err: scylla::transport::errors::QueryError }, + PackageInsertError { name: String, err: scylla::errors::ExecutionError }, /// Failed to query for the given package in the Scylla database. - VersionsQueryError { name: String, err: scylla::transport::errors::QueryError }, + VersionsQueryError { name: String, err: scylla::errors::ExecutionError }, /// Failed to parse a Version string VersionParseError { raw: String, err: specifications::version::ParseError }, /// No versions found for the given package NoVersionsFound { name: String }, /// Failed to query the database for the file of the given package. - PathQueryError { name: String, version: Version, err: scylla::transport::errors::QueryError }, + PathQueryError { name: String, version: Version, err: scylla::errors::ExecutionError }, /// The given package was unknown. UnknownPackage { name: String, version: Version }, /// Failed to get the metadata of a file. diff --git a/brane-api/src/packages.rs b/brane-api/src/packages.rs index a5bc6b2b..62434abb 100644 --- a/brane-api/src/packages.rs +++ b/brane-api/src/packages.rs @@ -25,8 +25,7 @@ use bytes::Buf; use log::{debug, error, info, warn}; use rand::Rng; use rand::distributions::Alphanumeric; -use scylla::macros::{FromUserType, IntoUserType}; -use scylla::{SerializeCql, Session}; +use scylla::client::session::Session; use specifications::package::PackageInfo; use specifications::version::Version; // use tar::Archive; @@ -163,7 +162,7 @@ impl TryFrom for PackageUdt { pub async fn ensure_db_table(scylla: &Session) -> Result<(), Error> { // Define the `brane.package` type if let Err(err) = scylla - .query( + .query_unpaged( "CREATE TYPE IF NOT EXISTS brane.package ( created bigint , description text @@ -186,7 +185,7 @@ pub async fn ensure_db_table(scylla: &Session) -> Result<(), Error> { // Define the `brane.packages` table if let Err(err) = scylla - .query( + .query_unpaged( "CREATE TABLE IF NOT EXISTS brane.packages ( name text , version text @@ -214,7 +213,7 @@ pub async fn ensure_db_table(scylla: &Session) -> Result<(), Error> { /// - `package`: The PackageInfo struct that describes the package, and is what we will insert. Note, however, that not _all_ information will make it; only the info present in a `PackageUdt` struct will. /// - `path`: The Path where the container image may be found. /// -/// # Returusn +/// # Returns /// Nothing, but does change the target Scylla database to include the new package. /// /// # Errors @@ -227,7 +226,7 @@ async fn insert_package_into_db(scylla: &Arc, package: &PackageInfo, pa // Insert it if let Err(err) = scylla - .query( + .query_unpaged( "INSERT INTO brane.packages ( name , version @@ -269,7 +268,7 @@ pub async fn download(name: String, version: String, context: Context) -> Result // Attempt to resolve the version from the Scylla database in the context debug!("Resolving version '{}'...", version); let version: Version = if version.to_lowercase() == "latest" { - let versions = match context.scylla.query("SELECT version FROM brane.packages WHERE name=?", vec![&name]).await { + let versions = match context.scylla.query_unpaged("SELECT version FROM brane.packages WHERE name=?", vec![&name]).await { Ok(versions) => versions, Err(err) => { fail!(Error::VersionsQueryError { name, err }); @@ -316,7 +315,7 @@ pub async fn download(name: String, version: String, context: Context) -> Result // With the version resolved, query the filename debug!("Retrieving filename for package '{}'@{}", name, version); let file: PathBuf = - match context.scylla.query("SELECT file FROM brane.packages WHERE name=? AND version=?", vec![&name, &version.to_string()]).await { + match context.scylla.query_unpaged("SELECT file FROM brane.packages WHERE name=? AND version=?", vec![&name, &version.to_string()]).await { Ok(file) => { if let Some(rows) = file.rows { if rows.is_empty() { diff --git a/brane-api/src/packages.rs.orig b/brane-api/src/packages.rs.orig new file mode 100644 index 00000000..388a6438 --- /dev/null +++ b/brane-api/src/packages.rs.orig @@ -0,0 +1,604 @@ +// PACKAGES.rs +// by Lut99 +// +// Created: +// 17 Oct 2022, 15:18:32 +// Last edited: +// 08 Feb 2024, 16:16:22 +// Auto updated? +// Yes +// +// Description: +//! Defines things that relate to packages. +// + +use std::borrow::Cow; +use std::convert::{TryFrom, TryInto}; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::Arc; + +use async_compression::tokio::bufread::GzipDecoder; +use brane_cfg::info::Info as _; +use brane_cfg::node::{CentralConfig, NodeConfig, NodeKind}; +use bytes::Buf; +use log::{debug, error, info, warn}; +use rand::Rng; +<<<<<<< Updated upstream +use rand::distributions::Alphanumeric; +use scylla::macros::{FromUserType, IntoUserType}; +use scylla::{SerializeCql, Session}; +||||||| Stash base +use rand::distr::Alphanumeric; +use scylla::macros::{FromUserType, IntoUserType}; +use scylla::{SerializeCql, Session}; +======= +use rand::distr::Alphanumeric; +use scylla::client::session::Session; +>>>>>>> Stashed changes +use specifications::package::PackageInfo; +use specifications::version::Version; +// use tar::Archive; +use tempfile::TempDir; +use tokio::fs as tfs; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use tokio_stream::StreamExt; +use tokio_tar::{Archive, Entries, Entry}; +use uuid::Uuid; +use warp::http::{HeaderValue, StatusCode}; +use warp::hyper::Body; +use warp::hyper::body::{Bytes, Sender}; +use warp::reply::Response; +use warp::{Rejection, Reply}; + +pub use crate::errors::PackageError as Error; +use crate::spec::Context; + + +/***** HELPER MACROS *****/ +/// Macro that early quits from a warp function by printing the error and then returning a 500. +macro_rules! fail { + ($err:expr) => {{ + // Implement a phony type that does implement reject (whatever) + struct InternalError; + impl std::fmt::Debug for InternalError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "An internal error has occurred.") } + } + impl warp::reject::Reject for InternalError {} + + // Now write the error to stderr and the internal error to the client + let err = $err; + error!("{}", err); + return Err(warp::reject::custom(InternalError)); + }}; + + ($path:ident, $err:expr) => {{ + // In this overload, we attempt to clear the existing file first + let path = &$path; + if path.is_file() { + if let Err(err) = tfs::remove_file(&path).await { + warn!("Failed to remove temporary download result '{}': {}", path.display(), err); + } + } else if path.is_dir() { + if let Err(err) = tfs::remove_dir_all(&path).await { + warn!("Failed to remove temporary download results '{}': {}", path.display(), err); + } + } + + // Move to the normal overload for the rest + fail!($err) + }}; +} + + + + + +/***** AUXILLARY STRUCTS *****/ +/// Defines the contents of a single Scylla database row that describes a package. +#[derive(Clone, IntoUserType, FromUserType, SerializeCql)] +pub struct PackageUdt { + pub created: i64, + pub description: String, + pub detached: bool, + pub digest: String, + pub functions_as_json: String, + pub id: Uuid, + pub kind: String, + pub name: String, + pub owners: Vec, + pub types_as_json: String, + pub version: String, +} + +impl TryFrom for PackageUdt { + type Error = Error; + + fn try_from(package: PackageInfo) -> Result { + // First, serialize the functions and the types as JSON + let functions_as_json: String = match serde_json::to_string(&package.functions) { + Ok(funcs) => funcs, + Err(err) => { + return Err(Error::FunctionsSerializeError { name: package.name, err }); + }, + }; + let types_as_json: String = match serde_json::to_string(&package.types) { + Ok(types) => types, + Err(err) => { + return Err(Error::TypesSerializeError { name: package.name, err }); + }, + }; + + // Assert that there is a digest + let digest: String = match package.digest { + Some(digest) => digest, + None => { + return Err(Error::MissingDigest { name: package.name }); + }, + }; + + // We can then simply populate the package info + Ok(Self { + created: package.created.timestamp_millis(), + description: package.description, + detached: package.detached, + digest, + functions_as_json, + id: package.id, + kind: String::from(package.kind), + name: package.name, + owners: package.owners, + types_as_json, + version: package.version.to_string(), + }) + } +} + + + + + +/***** AUXILLARY FUNCTIONS *****/ +/// Ensures that the packages table is present in the given Scylla database. +/// +/// # Arguments +/// - `scylla`: The Scylla database session that allows us to talk to it. +/// +/// # Returns +/// Nothing, but does change the target Scylla database to include the new table if it didn't already. +/// +/// # Errors +/// This function errors if the communication with the given database failed too. +pub async fn ensure_db_table(scylla: &Session) -> Result<(), Error> { + // Define the `brane.package` type + if let Err(err) = scylla + .query_unpaged( + "CREATE TYPE IF NOT EXISTS brane.package ( + created bigint + , description text + , detached boolean + , digest text + , functions_as_json text + , id uuid + , kind text + , name text + , owners list + , types_as_json text + , version text + )", + &[], + ) + .await + { + return Err(Error::PackageTypeDefineError { err }); + } + + // Define the `brane.packages` table + if let Err(err) = scylla + .query_unpaged( + "CREATE TABLE IF NOT EXISTS brane.packages ( + name text + , version text + , file text + , package frozen + , PRIMARY KEY (name, version) + )", + &[], + ) + .await + { + return Err(Error::PackageTableDefineError { err }); + } + + // Done + Ok(()) +} + + + +/// Inserts the given package into the given Scylla database. +/// +/// # Arguments +/// - `scylla`: The Scylla database session that allows us to talk to it. +/// - `package`: The PackageInfo struct that describes the package, and is what we will insert. Note, however, that not _all_ information will make it; only the info present in a `PackageUdt` struct will. +/// - `path`: The Path where the container image may be found. +/// +/// # Returns +/// Nothing, but does change the target Scylla database to include the new package. +/// +/// # Errors +/// This function errors if the communication with the given database failed too or if the given PackageInfo could not be converted to a PackageUdt for some reason. +async fn insert_package_into_db(scylla: &Arc, package: &PackageInfo, path: impl AsRef) -> Result<(), Error> { + let path: &Path = path.as_ref(); + + // Attempt to convert the package + let package: PackageUdt = package.clone().try_into()?; + + // Insert it + if let Err(err) = scylla + .query_unpaged( + "INSERT INTO brane.packages ( + name + , version + , file + , package + ) VALUES(?, ?, ?, ?) + ", + (&package.name, &package.version, path.to_string_lossy().to_string(), &package), + ) + .await + { + return Err(Error::PackageInsertError { name: package.name, err }); + } + + // Done + Ok(()) +} + + + + + +/***** LIBRARY *****/ +/// Downloads a file from the `brane-api` "registry" to the client. +/// +/// # Arguments +/// - `name`: The name of the package (container) to download. +/// - `version`: The version of the package (container) to download. May be 'latest'. +/// - `context`: The Context that describes some properties of the running environment, such as the location where the container images are stored. +/// +/// # Returns +/// A reply with as body the container archive. This archive will likely not be compressed (for now). +/// +/// # Errors +/// This function errors if resolving a 'latest' version failed, the requested package/version pair did not exist, the Scylla database was unreachable or we failed to read the image file. +pub async fn download(name: String, version: String, context: Context) -> Result { + info!("Handling GET on '/packages/{}/{}' (i.e., pull package)", name, version); + + // Attempt to resolve the version from the Scylla database in the context + debug!("Resolving version '{}'...", version); + let version: Version = if version.to_lowercase() == "latest" { + let versions = match context.scylla.query_unpaged("SELECT version FROM brane.packages WHERE name=?", vec![&name]).await { + Ok(versions) => versions, + Err(err) => { + fail!(Error::VersionsQueryError { name, err }); + }, + }; + let mut latest: Option = None; + if let Some(rows) = versions.rows { + for row in rows { + // Get the string value + let version: &str = row.columns[0].as_ref().unwrap().as_text().unwrap(); + + // Attempt to parse + let version: Version = match Version::from_str(version) { + Ok(version) => version, + Err(err) => { + fail!(Error::VersionParseError { raw: version.into(), err }); + }, + }; + + // Finally, find the most recent one + if latest.is_none() || version > *latest.as_ref().unwrap() { + latest = Some(version); + } + } + } + + // Error if none was found + match latest { + Some(version) => version, + None => { + error!("{}", Error::NoVersionsFound { name }); + return Err(warp::reject::not_found()); + }, + } + } else { + match Version::from_str(&version) { + Ok(version) => version, + Err(err) => { + fail!(Error::VersionParseError { raw: version, err }); + }, + } + }; + + // With the version resolved, query the filename + debug!("Retrieving filename for package '{}'@{}", name, version); + let file: PathBuf = + match context.scylla.query_unpaged("SELECT file FROM brane.packages WHERE name=? AND version=?", vec![&name, &version.to_string()]).await { + Ok(file) => { + if let Some(rows) = file.rows { + if rows.is_empty() { + error!("{}", Error::UnknownPackage { name, version }); + return Err(warp::reject::not_found()); + } + if rows.len() > 1 { + panic!("Database contains {} entries with the same name & version ('{}' & '{}')", rows.len(), name, version); + } + rows[0].columns[0].as_ref().unwrap().as_text().unwrap().into() + } else { + error!("{}", Error::UnknownPackage { name, version }); + return Err(warp::reject::not_found()); + } + }, + Err(err) => { + fail!(Error::PathQueryError { name, version, err }); + }, + }; + + // Retrieve the size of the file for the content length + let length: u64 = match tfs::metadata(&file).await { + Ok(metadata) => metadata.len(), + Err(err) => { + fail!(Error::FileMetadataError { path: file, err }); + }, + }; + + // Open a stream to said file + debug!("Sending back reply with compressed archive..."); + let (mut body_sender, body): (Sender, Body) = Body::channel(); + + // Spawn a tokio task that handles the rest while we return the response header + tokio::spawn(async move { + // Open the archive file to read + let mut handle: tfs::File = match tfs::File::open(&file).await { + Ok(handle) => handle, + Err(err) => { + fail!(Error::FileOpenError { path: file, err }); + }, + }; + + // Read it chunk-by-chunk + // (The size of the buffer, like most of the code but edited for not that library cuz it crashes during compilation, has been pulled from https://docs.rs/stream-body/latest/stream_body/) + let mut buf: [u8; 1024 * 16] = [0; 1024 * 16]; + loop { + // Read the chunk + let bytes: usize = match handle.read(&mut buf).await { + Ok(bytes) => bytes, + Err(err) => { + fail!(Error::FileReadError { path: file, err }); + }, + }; + if bytes == 0 { + break; + } + + // Send that with the body + if let Err(err) = body_sender.send_data(Bytes::copy_from_slice(&buf[..bytes])).await { + fail!(Error::FileSendError { path: file, err }); + } + } + + // Done + Ok(()) + }); + + // Done (at least, this task is) + let mut response: Response = Response::new(body); + response.headers_mut().insert("Content-Disposition", HeaderValue::from_static("attachment; filename=image.tar")); + response.headers_mut().insert("Content-Length", HeaderValue::from(length)); + Ok(response) +} + +/// Uploads a new package (container) to the central registry. +/// +/// # Arguments +/// - `package_archive`: The Bytes of the package archive to store somewhere. +/// - `context`: The Context that stores properties about the environment, such as the directory where we store the container files. +/// +/// # Returns +/// The Warp reply that contains the status code of the thing (e.g., OK if everything went fine). +/// +/// # Errors +/// This function errors if we fail to either write the package info to the Scylla database or the package archive to the local filesystem. +pub async fn upload(package_archive: S, context: Context) -> Result +where + S: StreamExt> + Unpin, + B: Buf, +{ + info!("Handling POST on '/packages' (i.e., upload new package)"); + let mut package_archive = package_archive; + + + + /* Step 0: Load config files */ + // Load the node config file + let node_config: NodeConfig = match NodeConfig::from_path(&context.node_config_path) { + Ok(config) => config, + Err(err) => { + fail!(Error::NodeConfigLoadError { err }); + }, + }; + let central: &CentralConfig = match node_config.node.try_central() { + Some(central) => central, + None => { + fail!(Error::NodeConfigUnexpectedKind { + path: context.node_config_path, + got: node_config.node.kind(), + expected: NodeKind::Central, + }); + }, + }; + + + + /* Step 1: Write the _uploadable_ archive */ + // Open a temporary directory + debug!("Preparing filesystem..."); + let tempdir: TempDir = match TempDir::new() { + Ok(tempdir) => tempdir, + Err(err) => { + fail!(Error::TempDirCreateError { err }); + }, + }; + let tempdir_path: &Path = tempdir.path(); + + // Generate a unique ID for the image name. + let id: String = rand::thread_rng().sample_iter(&Alphanumeric).take(8).map(char::from).collect(); + + // Attempt to open a new file + let tar_path: PathBuf = tempdir_path.join(format!("{id}.tar.gz")); + let mut handle = match tfs::File::create(&tar_path).await { + Ok(handle) => handle, + Err(err) => { + fail!(Error::TarCreateError { path: tar_path, err }); + }, + }; + + // Start writing the stream to it + debug!("Downloading submitted archive to '{}'...", tar_path.display()); + while let Some(chunk) = package_archive.next().await { + // Unwrap the chunk + let mut chunk: B = match chunk { + Ok(chunk) => chunk, + Err(err) => { + fail!(Error::BodyReadError { err }); + }, + }; + + // Write the chunk to the Tokio file + if let Err(err) = handle.write_all_buf(&mut chunk).await { + fail!(Error::TarWriteError { path: tar_path, err }); + } + } + + // Wait until the handle is finished writing + if let Err(err) = handle.shutdown().await { + fail!(Error::TarFlushError { path: tar_path, err }); + } + + + + /* Step 2: Extract the archive into a package info and container image. */ + // Re-open the file + debug!("Extracting submitted archive file..."); + let info_path: PathBuf = tempdir_path.join("package.yml"); + let image_path: PathBuf = central.paths.packages.join(format!("{id}.tar")); + { + let handle: tfs::File = match tfs::File::open(&tar_path).await { + Ok(handle) => handle, + Err(err) => { + fail!(Error::TarReopenError { path: tar_path, err }); + }, + }; + + // Wrap it in the unarchiver & decompressor + let dec: GzipDecoder> = GzipDecoder::new(BufReader::new(handle)); + let mut tar: Archive> = Archive::new(dec); + + // Iterate over the entries in the stream + let mut entries: Entries<_> = match tar.entries() { + Ok(entries) => entries, + Err(err) => { + fail!(Error::TarEntriesError { path: tar_path, err }); + }, + }; + let mut i: usize = 0; + let mut did_info: bool = false; + let mut did_image: bool = false; + while let Some(entry) = entries.next().await { + // Unwrap the entry + let mut entry: Entry<_> = match entry { + Ok(entry) => entry, + Err(err) => { + fail!(Error::TarEntryError { path: tar_path, entry: i, err }); + }, + }; + + // Attempt to get its path + let entry_path: Cow = match entry.path() { + Ok(path) => path, + Err(err) => { + fail!(Error::TarEntryPathError { path: tar_path, entry: i, err }); + }, + }; + + // Attempt to extract it based on the type of file + if entry_path == PathBuf::from("package.yml") { + // Extract as such + debug!("Extracting '{}/package.yml' to '{}'...", tar_path.display(), info_path.display()); + if let Err(err) = entry.unpack(&info_path).await { + fail!(Error::TarFileUnpackError { file: PathBuf::from("package.yml"), tarball: tar_path, target: info_path, err }); + } + did_info = true; + } else if entry_path == PathBuf::from("image.tar") { + // Extract as such + debug!("Extracting '{}/image.tar' to '{}'...", tar_path.display(), image_path.display()); + if let Err(err) = entry.unpack(&image_path).await { + fail!(Error::TarFileUnpackError { file: PathBuf::from("image.tar"), tarball: tar_path, target: image_path, err }); + } + did_image = true; + } else { + debug!("Ignoring irrelevant entry '{}' in '{}'", entry_path.display(), tar_path.display()); + } + + // Advance the index for debugging purposes + i += 1; + } + + // Assert that both of our relevant files must have been present + if !did_info || !did_image { + fail!(Error::TarMissingEntries { expected: vec!["package.yml", "image.tar"], path: tar_path }); + } + } + + + + /* Step 3: Insert the package into the DB */ + debug!("Reading package info '{}'...", info_path.display()); + // Read the extracted package info + let sinfo: String = match tfs::read_to_string(&info_path).await { + Ok(sinfo) => sinfo, + Err(err) => { + fail!(Error::PackageInfoReadError { path: info_path, err }); + }, + }; + let info: PackageInfo = match serde_yaml::from_str(&sinfo) { + Ok(info) => info, + Err(err) => { + fail!(Error::PackageInfoParseError { path: info_path, err }); + }, + }; + + // Copy the image tar to the proper location + let result_path: PathBuf = central.paths.packages.join(format!("{}-{}.tar", info.name, info.version)); + debug!("Moving image '{}' to '{}'...", image_path.display(), result_path.display()); + if let Err(err) = tfs::rename(&image_path, &result_path).await { + fail!(image_path, Error::FileMoveError { from: image_path, to: result_path, err }); + } + + // Call the insert function to store the dataset in the registry + debug!("Inserting package '{}' (version {}) into Scylla DB...", info.name, info.version); + if let Err(err) = insert_package_into_db(&context.scylla, &info, &result_path).await { + fail!(result_path, err); + } + + + + /* Step 4: Done */ + // The package has now been added + debug!("Upload of package '{}' (version {}) complete.", info.name, info.version); + Ok(StatusCode::OK) + + // Note that the temporary directory is automagically removed +} diff --git a/brane-api/src/schema.rs b/brane-api/src/schema.rs index 6a65d46a..043b9a4b 100644 --- a/brane-api/src/schema.rs +++ b/brane-api/src/schema.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use chrono::{DateTime, TimeZone, Utc}; use juniper::{EmptySubscription, FieldResult, GraphQLObject, RootNode, graphql_object}; use log::{debug, info}; -use scylla::IntoTypedRows; use specifications::version::Version; use crate::packages::PackageUdt; @@ -81,7 +80,9 @@ impl Query { debug!("Querying Scylla database..."); let mut packages: Vec = vec![]; - if let Some(rows) = scylla.query(query, &(like,)).await?.rows { + if let Ok(rows_result) = scylla.query_unpaged(query, &(like,)).await?.into_rows_result() { + // FIXME: We are going to need to bubble these different errors up + let rows = rows_result.rows(); // Search for all matches of this package for row in rows.into_typed::<(PackageUdt,)>() { let (package,) = row?; @@ -163,7 +164,7 @@ impl Mutations { // Get the image file first, tho debug!("Querying file path from Scylla database..."); let query = "SELECT file FROM brane.packages WHERE name = ? AND version = ?"; - let file = scylla.query(query, &(&name, &version)).await?; + let file = scylla.query_unpaged(query, &(&name, &version)).await?; if let Some(rows) = file.rows { if rows.is_empty() { return Ok("OK!"); @@ -173,7 +174,7 @@ impl Mutations { // Delete the thing from the database debug!("Deleting package from Scylla database..."); let query = "DELETE FROM brane.packages WHERE name = ? AND version = ?"; - scylla.query(query, &(&name, &version)).await?; + scylla.query_unpaged(query, &(&name, &version)).await?; // Delete the file debug!("Deleting container file '{}'...", file.display()); diff --git a/brane-api/src/spec.rs b/brane-api/src/spec.rs index 36fc6401..0c2e0271 100644 --- a/brane-api/src/spec.rs +++ b/brane-api/src/spec.rs @@ -16,7 +16,7 @@ use std::path::PathBuf; use std::sync::Arc; use brane_prx::client::ProxyClient; -use scylla::Session; +use scylla::client::session::Session; /***** LIBRARY *****/