Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 118 additions & 98 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ rust-version = "1.82"
publish = false

[workspace.dependencies]
async-nats = { version = "0.39.0", features = ["service"] }
async-nats = { git = "https://github.com/systeminit/nats.rs.git", branch = "nick/d94750e", features = ["service"] }
async-openai = "0.26.0"
async-recursion = "1.1.1"
async-trait = "0.1.83"
Expand Down
4 changes: 3 additions & 1 deletion lib/naxum/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ pub use self::error::Error;
pub use self::json::Json;
pub use self::make_service::IntoMakeService;
pub use self::message::{Extensions, Head, HeadRef, Message, MessageHead};
pub use self::serve::{serve, serve_with_incoming_limit};
pub use self::serve::{
serve, serve_with_incoming_limit, serve_with_incoming_limit_and_force_reconnect_sender,
};
pub use self::service_ext::ServiceExt;

pub use async_nats::StatusCode;
Expand Down
50 changes: 47 additions & 3 deletions lib/naxum/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{
};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tower::{Service, ServiceExt};
use tracing::{debug, error, info, trace};
use tracing::{debug, error, info, trace, warn};

use crate::{
message::{Message, MessageHead},
Expand Down Expand Up @@ -52,6 +52,32 @@ where
{
Serve {
stream,
force_reconnect_sender: None,
make_service,
limit: limit.into(),
_service_marker: PhantomData,
_stream_error_marker: PhantomData,
_request_marker: PhantomData,
}
}

pub fn serve_with_incoming_limit_and_force_reconnect_sender<M, S, T, E, R>(
stream: T,
force_reconnect_sender: tokio::sync::mpsc::Sender<()>,
make_service: M,
limit: impl Into<Option<usize>>,
) -> Serve<M, S, T, E, R>
where
M: for<'a> Service<IncomingMessage<'a, R>, Error = Infallible, Response = S>,
S: Service<Message<R>, Response = Response, Error = Infallible> + Clone + Send + 'static,
S::Future: Send,
T: Stream<Item = Result<R, E>>,
E: error::Error,
R: MessageHead,
{
Serve {
stream,
force_reconnect_sender: Some(force_reconnect_sender),
make_service,
limit: limit.into(),
_service_marker: PhantomData,
Expand All @@ -63,6 +89,7 @@ where
#[must_use = "futures must be awaited or polled"]
pub struct Serve<M, S, T, E, R> {
stream: T,
force_reconnect_sender: Option<tokio::sync::mpsc::Sender<()>>,
make_service: M,
limit: Option<usize>,
_service_marker: PhantomData<S>,
Expand All @@ -88,6 +115,7 @@ impl<M, S, T, E, R> Serve<M, S, T, E, R> {
{
WithGracefulShutdown {
stream: self.stream,
force_reconnect_sender: self.force_reconnect_sender,
make_service: self.make_service,
limit: self.limit,
signal,
Expand All @@ -102,6 +130,7 @@ impl<M, S, T, E, R> Serve<M, S, T, E, R> {
#[must_use = "futures must be awaited or polled"]
pub struct WithGracefulShutdown<M, S, T, E, R, F> {
stream: T,
force_reconnect_sender: Option<tokio::sync::mpsc::Sender<()>>,
make_service: M,
limit: Option<usize>,
signal: F,
Expand Down Expand Up @@ -183,6 +212,20 @@ where
Some(Err(err)) => {
error!(si.error.message = ?err, "failed to read next message from stream");
metric!(counter.naxum.next_message.failed = 1);
if let Some(sender) = self.force_reconnect_sender.clone() {
warn!(
si.investigation.name = "verideath",
"sending (blocking send) force reconnect in naxum..."
);
if let Err(err) = sender.try_send(()) {
error!(si.error.message = ?err, "could not send force reconnect in naxum");
} else {
warn!(
si.investigation.name = "verideath",
"sent (blocking send) force reconnect in naxum!"
);
}
}
continue;
},
None => {
Expand All @@ -194,7 +237,8 @@ where
}
};

trace!(subject = msg.subject().as_str(), "message received");
// let subject = msg.subject().to_string();
// info!(naxum.next_message.subject = %subject, "message received");
metric!(counter.naxum.next_message.processing = 1);

poll_fn(|cx| make_service.poll_ready(cx))
Expand All @@ -209,7 +253,7 @@ where
tracker.spawn(async move {
let _result = tower_svc.oneshot(msg).await;
metric!(counter.naxum.next_message.processing = -1);
trace!("message processed");
// info!(naxum.next_message.subject = %subject, "message processed");

drop(permit);
});
Expand Down
12 changes: 10 additions & 2 deletions lib/si-data-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ impl Client {
#[instrument(
name = "nats_client.force_reconnect",
skip_all,
level = "debug",
level = "info",
fields(
messaging.client_id = self.metadata.messaging_client_id(),
messaging.nats.server.id = self.metadata.messaging_nats_server_id(),
Expand All @@ -1094,12 +1094,20 @@ impl Client {
)
)]
pub async fn force_reconnect(&self) -> Result<()> {
let span = current_span_for_instrument_at!("debug");
let span = current_span_for_instrument_at!("info");

warn!(
si.investigation.name = "verideath",
"asking inner client for force reconnect..."
);
self.inner
.force_reconnect()
.await
.map_err(|err| span.record_err(Error::NatsReconnect(err)))?;
warn!(
si.investigation.name = "verideath",
"force reconnect succeeded from client wrapper's perspective"
);

span.record_ok();
Ok(())
Expand Down
48 changes: 37 additions & 11 deletions lib/veritech-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use si_pool_noodle::{
ManagementResultSuccess, ProgressMessage, ResolverFunctionResultSuccess,
SchemaVariantDefinitionResultSuccess, SensitiveStrings, ValidationResultSuccess,
};
use std::{collections::HashMap, result, str::Utf8Error, sync::Arc, time::Duration};
use std::{
collections::HashMap,
result,
str::Utf8Error,
sync::{atomic::Ordering, Arc},
time::Duration,
};
use telemetry::prelude::*;
use telemetry_utils::metric;
use thiserror::Error;
Expand Down Expand Up @@ -207,6 +213,7 @@ where
let nats_for_publisher = state.nats.clone();
let publisher = Publisher::new(&nats_for_publisher, &reply_mailbox);
let execution_id = request.execution_id().to_owned();
let execution_kind = request.kind().to_owned();
let cyclone_request = CycloneRequest::from_parts(request.clone(), sensitive_strings);

let (kill_sender, kill_receiver) = oneshot::channel::<()>();
Expand All @@ -232,13 +239,23 @@ where
span.record_err(err)
})?;

let mut count = 1;
while let Some(msg) = progress.next().await {
match msg {
Ok(ProgressMessage::OutputStream(output)) => {
publisher.publish_output(&output).await.map_err(|err| {
request.dec_run_metric();
span.record_err(err)
})?;
publisher
.publish_output(
&output,
execution_id.to_owned(),
execution_kind.to_owned(),
count,
)
.await
.map_err(|err| {
request.dec_run_metric();
span.record_err(err)
})?;
count += 1;
}
Ok(ProgressMessage::Heartbeat) => {
trace!("received heartbeat message");
Expand All @@ -249,10 +266,13 @@ where
}
}
}
publisher.finalize_output().await.map_err(|err| {
request.dec_run_metric();
span.record_err(err)
})?;
publisher
.finalize_output(execution_id.to_owned(), execution_kind.to_owned())
.await
.map_err(|err| {
request.dec_run_metric();
span.record_err(err)
})?;

let function_result = progress.finish().await.map_err(|err| {
request.dec_run_metric();
Expand Down Expand Up @@ -285,7 +305,10 @@ where
match result {
// Got an Ok - let anyone subscribing to a reply know
Ok(function_result) => {
if let Err(err) = publisher.publish_result(&function_result).await {
if let Err(err) = publisher
.publish_result(&function_result, execution_id, execution_kind)
.await
{
error!(si.error.message = ?err, "failed to publish errored result");
}

Expand Down Expand Up @@ -329,7 +352,10 @@ where
}
};
request.dec_run_metric();
if let Err(err) = publisher.publish_result(&func_result_error).await {
if let Err(err) = publisher
.publish_result(&func_result_error, execution_id, execution_kind)
.await
{
error!(si.error.message = ?err, "failed to publish errored result");
}
}
Expand Down
10 changes: 7 additions & 3 deletions lib/veritech-server/src/handlers/kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use naxum::{
};
use si_data_nats::Subject;
use si_pool_noodle::{
FunctionResult, FunctionResultFailure, FunctionResultFailureError,
CycloneRequestable, FunctionResult, FunctionResultFailure, FunctionResultFailureError,
FunctionResultFailureErrorKind, KillExecutionRequest,
};
use telemetry::prelude::*;
Expand Down Expand Up @@ -37,12 +37,13 @@ async fn kill_execution_request_task(
) {
let publisher = Publisher::new(&state.nats, &reply_mailbox);

let execution_kind = request.kind().to_owned();
let execution_id = request.execution_id;

let result = match kill_execution_request(state, execution_id.to_owned()).await {
Ok(()) => FunctionResult::Success(()),
Err(err) => FunctionResult::Failure(FunctionResultFailure::new(
execution_id,
execution_id.to_owned(),
FunctionResultFailureError {
kind: FunctionResultFailureErrorKind::KilledExecution,
message: err.to_string(),
Expand All @@ -51,7 +52,10 @@ async fn kill_execution_request_task(
)),
};

if let Err(err) = publisher.publish_result(&result).await {
if let Err(err) = publisher
.publish_result(&result, execution_id, execution_kind)
.await
{
error!(?err, "failed to publish result");
}
}
Expand Down
Loading