diff --git a/Cargo.lock b/Cargo.lock index fb858fc..8754b3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -333,9 +333,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" [[package]] name = "axum" -version = "0.7.5" +version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" dependencies = [ "async-trait", "axum-core", @@ -344,6 +344,8 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", + "hyper 1.4.1", + "hyper-util", "itoa", "matchit", "memchr", @@ -352,10 +354,15 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", - "sync_wrapper", - "tower", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower 0.5.1", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -373,9 +380,10 @@ dependencies = [ "mime", "pin-project-lite", "rustversion", - "sync_wrapper", + "sync_wrapper 1.0.1", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -618,6 +626,7 @@ version = "0.4.0" dependencies = [ "async-recursion", "async-trait", + "axum", "filesize", "log", "oxigraph", @@ -626,9 +635,11 @@ dependencies = [ "query_processing", "representation", "reqwest", + "serde", "sparesults", "spargebra 0.3.0-alpha.5-parparse", "thiserror", + "tokio", "uuid", "virtualization", "virtualized_query", @@ -1506,7 +1517,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -3413,7 +3424,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.1", "system-configuration", "tokio", "tokio-native-tls", @@ -3664,6 +3675,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -3950,6 +3971,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.1" @@ -4217,7 +4244,7 @@ dependencies = [ "socket2", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -4243,17 +4270,33 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -4261,6 +4304,7 @@ version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/Cargo.toml b/Cargo.toml index ca37070..e42cd0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,8 @@ opcua = {version="0.12.0", features = ["vendored-openssl"]} url = "2.5.2" uuid = {version = "1.10.0", features = ["fast-rng", "v4"]} +axum = "0.7.7" + [patch.crates-io] oxrdf = { git = 'https://github.com/magbak/oxigraph.git', rev = "b13df973ed2785de2ac41066ca4b62d88d3f5d40"} oxttl = { git = 'https://github.com/magbak/oxigraph.git', rev = "b13df973ed2785de2ac41066ca4b62d88d3f5d40"} diff --git a/lib/chrontext/Cargo.toml b/lib/chrontext/Cargo.toml index 2ee27eb..920b9e2 100644 --- a/lib/chrontext/Cargo.toml +++ b/lib/chrontext/Cargo.toml @@ -33,6 +33,9 @@ async-trait.workspace = true oxigraph.workspace = true filesize.workspace = true uuid.workspace = true +axum.workspace = true +tokio.workspace = true +serde.workspace = true [features] opcua = ["virtualization/opcua"] diff --git a/lib/chrontext/src/lib.rs b/lib/chrontext/src/lib.rs index bd46e49..41b0875 100644 --- a/lib/chrontext/src/lib.rs +++ b/lib/chrontext/src/lib.rs @@ -11,3 +11,4 @@ pub mod rewriting; pub mod sparql_database; mod sparql_result_to_polars; pub mod splitter; +pub mod web; diff --git a/lib/chrontext/src/web.rs b/lib/chrontext/src/web.rs new file mode 100644 index 0000000..2381309 --- /dev/null +++ b/lib/chrontext/src/web.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use axum::response::Html; +use axum::Form; +use axum::{self, Router}; +use axum::{extract::State, routing::get}; + +use oxigraph::sparql::{results::QueryResultsFormat, QueryResults, QuerySolutionIter}; +use oxrdf::Variable; +use serde::Deserialize; + +use crate::sparql_database::SparqlQueryable; + +#[derive(Clone)] +struct AppState { + sparql_engine: Arc<(dyn SparqlQueryable)>, +} + +pub async fn launch_web(sparql_engine: Arc<(dyn SparqlQueryable)>, address: &str) { + let state = AppState { sparql_engine }; + + let app: Router = Router::new() + .route("/", get(|| async { "Hello, World!" })) + .route("/query", get(get_query).post(post_query)) + .with_state(state); + + let listener = tokio::net::TcpListener::bind(address).await.unwrap(); + axum::serve(listener, app).await.unwrap(); +} + +async fn get_query() -> Html<&'static str> { + Html(include_str!("web/sparql.html")) +} + +#[derive(Deserialize, Debug)] +struct SparqlQuery { + query: String, +} + +async fn post_query(State(state): State, Form(form): Form) -> String { + let sparql_engine = state.sparql_engine; + + let query = spargebra::Query::parse(&form.query, None).unwrap(); + + let query_result: QueryResults = match sparql_engine.execute(&query).await { + Ok(v) => { + let variables: Arc<[Variable]> = v.first().unwrap().variables().into(); + let iter = v + .into_iter() + .map(|qs| Ok(qs.values().iter().map(|t| t.clone()).collect())); + let qsi = QuerySolutionIter::new(variables, iter); + qsi.into() + } + Err(_) => todo!(), + }; + + let mut results = Vec::new(); + query_result.write(&mut results, QueryResultsFormat::Json); + + String::from_utf8_lossy(&results).into() +} diff --git a/lib/chrontext/src/web/sparql.html b/lib/chrontext/src/web/sparql.html new file mode 100644 index 0000000..f08a8f7 --- /dev/null +++ b/lib/chrontext/src/web/sparql.html @@ -0,0 +1,24 @@ + + + + + + Chrontext - Query + + + + + +
+ + diff --git a/lib/flight/src/server.rs b/lib/flight/src/server.rs index b2a5353..88daa66 100644 --- a/lib/flight/src/server.rs +++ b/lib/flight/src/server.rs @@ -17,8 +17,8 @@ // specific language governing permissions and limitations // under the License. -use std::net::AddrParseError; use futures::stream::BoxStream; +use std::net::AddrParseError; use std::pin::Pin; use std::sync::Arc; use tonic::{Code, Request, Response, Status, Streaming}; @@ -33,11 +33,11 @@ use bincode::deserialize; use bincode::serialize; use chrontext::engine::Engine; use futures::{stream, Stream}; +use log::info; use polars::io::SerWriter; use polars::prelude::{IpcCompression, IpcStreamWriter}; use thiserror::*; use tonic::transport::Server; -use log::info; #[derive(Error, Debug)] pub enum ChrontextFlightServerError { @@ -104,7 +104,9 @@ impl FlightService for ChrontextFlightService { info!("Got do_get request: {:?}", request); let query_string: String = deserialize(request.get_ref().ticket.as_ref()).unwrap(); let (mut df, map, _context) = self - .engine.as_ref().unwrap() + .engine + .as_ref() + .unwrap() .clone() .query(&query_string) .await @@ -170,7 +172,9 @@ impl ChrontextFlightServer { pub async fn serve(self, addr: &str) -> Result<(), ChrontextFlightServerError> { info!("Starting server on {}", addr); - let addr = addr.parse().map_err(|x|ChrontextFlightServerError::AddrParseError(x))?; + let addr = addr + .parse() + .map_err(|x| ChrontextFlightServerError::AddrParseError(x))?; let svc = FlightServiceServer::new(self.chrontext_flight_service.clone()); Server::builder() @@ -181,4 +185,4 @@ impl ChrontextFlightServer { info!("Shutdown server"); Ok(()) } -} \ No newline at end of file +} diff --git a/py_chrontext/src/errors.rs b/py_chrontext/src/errors.rs index 959604d..379512f 100644 --- a/py_chrontext/src/errors.rs +++ b/py_chrontext/src/errors.rs @@ -1,11 +1,10 @@ use chrontext::errors::ChrontextError as RustChrontextError; +use flight::client::ChrontextFlightClientError; +use flight::server::ChrontextFlightServerError; use oxrdf::IriParseError; use pyo3::{create_exception, exceptions::PyException, prelude::*}; use spargebra::SparqlSyntaxError; use thiserror::Error; -use flight::client::ChrontextFlightClientError; -use flight::server::ChrontextFlightServerError; - #[derive(Error, Debug)] pub enum PyChrontextError { @@ -49,12 +48,8 @@ impl std::convert::From for PyErr { PyChrontextError::MultipleVirtualizedDatabasesError => { MultipleVirtualizedDatabasesError::new_err("") } - PyChrontextError::FlightClientError(x) => { - FlightClientError::new_err(x.to_string()) - } - PyChrontextError::FlightServerError(x) => { - FlightServerError::new_err(x.to_string()) - } + PyChrontextError::FlightClientError(x) => FlightClientError::new_err(x.to_string()), + PyChrontextError::FlightServerError(x) => FlightServerError::new_err(x.to_string()), } } } diff --git a/py_chrontext/src/lib.rs b/py_chrontext/src/lib.rs index 0d9b3b2..89bc217 100644 --- a/py_chrontext/src/lib.rs +++ b/py_chrontext/src/lib.rs @@ -35,6 +35,8 @@ static GLOBAL: Jemalloc = Jemalloc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +use std::thread; + use crate::errors::PyChrontextError; use chrontext::engine::{Engine, EngineConfig}; use chrontext::sparql_database::sparql_embedded_oxigraph::EmbeddedOxigraphConfig; @@ -271,19 +273,41 @@ impl PyEngine { }) } - pub fn serve_flight(&mut self, address: &str, py:Python) -> PyResult<()> { + pub fn serve_flight(&mut self, address: &str, py: Python) -> PyResult<()> { py.allow_threads(move || { if self.engine.is_none() { self.init()?; } - let flight_server = ChrontextFlightServer::new(Some(Arc::new(self.engine.take().unwrap()))); + let flight_server = + ChrontextFlightServer::new(Some(Arc::new(self.engine.take().unwrap()))); let mut builder = Builder::new_multi_thread(); builder.enable_all(); builder .build() .unwrap() .block_on(flight_server.serve(address)) - .map_err(|x|PyChrontextError::FlightServerError(x))?; + .map_err(|x| PyChrontextError::FlightServerError(x))?; + + Ok(()) + }) + } + + pub fn serve_web(&mut self, address: &str, py: Python) -> PyResult<()> { + let address = address.to_owned(); + py.allow_threads(move || { + if self.engine.is_none() { + self.init()?; + } + let sparql_database = self.engine.as_mut().unwrap().sparql_database.clone(); + thread::spawn(move || { + let mut builder = Builder::new_multi_thread(); + builder.enable_all(); + builder + .build() + .unwrap() + .block_on(chrontext::web::launch_web(sparql_database, &address)); + }); + Ok(()) }) } @@ -292,14 +316,14 @@ impl PyEngine { #[derive(Clone)] #[pyclass(name = "FlightClient")] pub struct PyFlightClient { - uri:String + uri: String, } #[pymethods] impl PyFlightClient { #[new] - pub fn new(uri:String) -> PyResult { - Ok(Self {uri}) + pub fn new(uri: String) -> PyResult { + Ok(Self { uri }) } pub fn query( @@ -321,10 +345,10 @@ impl PyFlightClient { .build() .unwrap() .block_on(client.query(&sparql)) - .map_err(|x|PyChrontextError::FlightClientError(x))?; + .map_err(|x| PyChrontextError::FlightClientError(x))?; Ok(sm) }); - match res { + match res { Ok(sm) => { let EagerSolutionMappings { mut mappings, @@ -343,10 +367,9 @@ impl PyFlightClient { py, )?; Ok(pydf) - }, - Err(e) => Err(e) + } + Err(e) => Err(e), } - } }