Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions crates/core/src/contract/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,85 @@ pub mod test {

Ok(())
}

/// Regression test for issue #2278: Verifies that send_to_sender returns
/// an error when the response receiver is dropped, but does NOT crash or
/// break the channel.
///
/// This tests that the channel infrastructure supports the fix in
/// contract/mod.rs where we changed `send_to_sender()?` to non-propagating
/// error handling, so the handler loop can continue even when a response
/// can't be delivered.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_to_sender_fails_gracefully_when_receiver_dropped() -> anyhow::Result<()> {
let (send_halve, mut rcv_halve, _) = contract_handler_channel();

let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new(
Arc::new(ContractCode::from(vec![0, 1, 2, 3])),
Parameters::from(vec![4, 5]),
)));
let key = contract.key();

// Send a request
let h = GlobalExecutor::spawn({
async move {
send_halve
.send_to_handler(ContractHandlerEvent::PutQuery {
key,
state: vec![6, 7, 8].into(),
related_contracts: RelatedContracts::default(),
contract: None,
})
.await
}
});

// Receive the event from the handler side
let (id, ev) =
tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_sender())
.await??;

// Verify it's a PutQuery
let ContractHandlerEvent::PutQuery { state, .. } = ev else {
anyhow::bail!("expected PutQuery event");
};
assert_eq!(state.as_ref(), &[6, 7, 8]);

// Abort the request handler task - this drops the oneshot receiver
// simulating a client disconnect
h.abort();
// Wait a bit for the abort to take effect
tokio::time::sleep(Duration::from_millis(10)).await;

// Try to send the response - this should fail because the receiver was dropped
// when we aborted the task. In the actual contract_handling loop (after the fix),
// this would just log and continue rather than propagating the error.
let send_result = rcv_halve
.send_to_sender(
id,
ContractHandlerEvent::PutResponse {
new_value: Ok(vec![0, 7].into()),
},
)
.await;

// send_to_sender should fail because receiver was dropped
assert!(
send_result.is_err(),
"send_to_sender should fail when receiver is dropped, got {:?}",
send_result
);

// Verify the error is the expected NoEvHandlerResponse
// Use super:: to disambiguate from freenet_stdlib::prelude::ContractError
assert!(
matches!(send_result, Err(super::ContractError::NoEvHandlerResponse)),
"Expected NoEvHandlerResponse error, got {:?}",
send_result
);

Ok(())
}
}

pub(super) mod in_memory {
Expand Down
116 changes: 67 additions & 49 deletions crates/core/src/contract/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ where
phase = "get_complete",
"Fetched contract"
);
contract_handler
// Send response back to caller. If the caller disconnected, just log and continue.
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
Expand All @@ -65,13 +66,13 @@ where
},
)
.await
.map_err(|error| {
tracing::debug!(
error = %error,
"Shutting down contract handler"
);
error
})?;
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GET response (client may have disconnected)"
);
}
}
Err(err) => {
tracing::warn!(
Expand All @@ -89,7 +90,8 @@ where
);
return Err(ContractError::FatalExecutorError { key, error: err });
}
contract_handler
// Send error response back to caller. If the caller disconnected, just log and continue.
if let Err(error) = contract_handler
.channel()
.send_to_sender(
id,
Expand All @@ -99,13 +101,13 @@ where
},
)
.await
.map_err(|error| {
tracing::debug!(
error = %error,
"Shutting down contract handler"
);
error
})?;
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send GET error response (client may have disconnected)"
);
}
}
}
}
Expand Down Expand Up @@ -168,17 +170,20 @@ where
}
};

contract_handler
// Send response back to caller. If the caller disconnected (e.g., WebSocket closed),
// the response channel may be dropped. This is not fatal - the contract has already
// been stored, so we just log and continue processing other events.
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, event_result)
.await
.map_err(|error| {
tracing::debug!(
error = %error,
"Shutting down contract handler"
);
error
})?;
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send PUT response (client may have disconnected)"
);
}
}
ContractHandlerEvent::UpdateQuery {
key,
Expand Down Expand Up @@ -258,17 +263,19 @@ where
}
};

contract_handler
// Send response back to caller. If the caller disconnected, the response channel
// may be dropped. This is not fatal - the update has already been applied.
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, event_result)
.await
.map_err(|error| {
tracing::debug!(
error = %error,
"Shutting down contract handler"
);
error
})?;
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send UPDATE response (client may have disconnected)"
);
}
}
ContractHandlerEvent::DelegateRequest {
req,
Expand Down Expand Up @@ -309,17 +316,19 @@ where
}
};

contract_handler
// Send response back to caller. If the caller disconnected, the response channel
// may be dropped. This is not fatal - the delegate has already been processed.
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::DelegateResponse(response))
.await
.map_err(|error| {
tracing::debug!(
error = %error,
"Shutting down contract handler"
);
error
})?;
{
tracing::debug!(
error = %error,
delegate_key = %delegate_key,
"Failed to send DELEGATE response (client may have disconnected)"
);
}
}
ContractHandlerEvent::RegisterSubscriberListener {
key,
Expand All @@ -340,14 +349,19 @@ where
);
});

// FIXME: if there is an error senc actually an error back
contract_handler
// FIXME: if there is an error send actually an error back
// If the caller disconnected, just log and continue.
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse)
.await
.inspect_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
})?;
{
tracing::debug!(
error = %error,
contract = %key,
"Failed to send RegisterSubscriberListener response (client may have disconnected)"
);
}
}
ContractHandlerEvent::QuerySubscriptions { callback } => {
// Get subscription information from the executor and send it through the callback
Expand All @@ -362,13 +376,17 @@ where
.send(crate::message::QueryResult::NetworkDebug(network_debug))
.await;

contract_handler
// If the caller disconnected, just log and continue.
if let Err(error) = contract_handler
.channel()
.send_to_sender(id, ContractHandlerEvent::QuerySubscriptionsResponse)
.await
.inspect_err(|error| {
tracing::debug!(%error, "shutting down contract handler");
})?;
{
tracing::debug!(
error = %error,
"Failed to send QuerySubscriptions response (client may have disconnected)"
);
}
}
_ => unreachable!("ContractHandlerEvent enum should be exhaustive here"),
}
Expand Down
74 changes: 66 additions & 8 deletions crates/fdev/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use freenet_stdlib::prelude::{
ContractCode, ContractContainer, ContractWasmAPIVersion, Parameters, WrappedContract,
};
use freenet_stdlib::{
client_api::{ClientRequest, ContractRequest, DelegateRequest, WebApi},
client_api::{
ClientRequest, ContractRequest, ContractResponse, DelegateRequest, HostResponse, WebApi,
},
prelude::*,
};
use xz2::read::XzDecoder;
Expand Down Expand Up @@ -170,9 +172,26 @@ async fn put_contract(
tracing::debug!("Starting WebSocket client connection");
let mut client = start_api_client(other).await?;
tracing::debug!("WebSocket client connected successfully");
let result = execute_command(request, &mut client).await;
tracing::debug!(success = ?result.is_ok(), "WebSocket client operation complete");
result
execute_command(request, &mut client).await?;

// Wait for server response before closing connection
let response = client
.recv()
.await
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;

match response {
HostResponse::ContractResponse(ContractResponse::PutResponse { key: response_key }) => {
tracing::info!(%response_key, "Contract published successfully");
Ok(())
}
HostResponse::ContractResponse(other) => {
anyhow::bail!("Unexpected contract response: {:?}", other)
}
other => {
anyhow::bail!("Unexpected response type: {:?}", other)
}
}
}

async fn put_delegate(
Expand Down Expand Up @@ -204,15 +223,32 @@ For additional hardening is recommended to use a different cipher and nonce to e
(cipher, nonce)
};

println!("Putting delegate {} ", delegate.key().encode());
let delegate_key = delegate.key().clone();
println!("Putting delegate {} ", delegate_key.encode());
let request = DelegateRequest::RegisterDelegate {
delegate,
cipher,
nonce,
}
.into();
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
execute_command(request, &mut client).await?;

// Wait for server response before closing connection
let response = client
.recv()
.await
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;

match response {
HostResponse::DelegateResponse { key, values } => {
tracing::info!(%key, response_count = values.len(), "Delegate registered successfully");
Ok(())
}
other => {
anyhow::bail!("Unexpected response type: {:?}", other)
}
}
}

#[derive(clap::Parser, Clone, Debug)]
Expand Down Expand Up @@ -253,7 +289,7 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
if config.release {
anyhow::bail!("Cannot publish contracts in the network yet");
}
let key = ContractInstanceId::try_from(config.key)?.into();
let key: ContractKey = ContractInstanceId::try_from(config.key)?.into();
println!("Updating contract {key}");
let data = {
let mut buf = vec![];
Expand All @@ -262,7 +298,29 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<(
};
let request = ContractRequest::Update { key, data }.into();
let mut client = start_api_client(other).await?;
execute_command(request, &mut client).await
execute_command(request, &mut client).await?;

// Wait for server response before closing connection
let response = client
.recv()
.await
.map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?;

match response {
HostResponse::ContractResponse(ContractResponse::UpdateResponse {
key: response_key,
summary,
}) => {
tracing::info!(%response_key, ?summary, "Contract updated successfully");
Ok(())
}
HostResponse::ContractResponse(other) => {
anyhow::bail!("Unexpected contract response: {:?}", other)
}
other => {
anyhow::bail!("Unexpected response type: {:?}", other)
}
}
}

pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result<WebApi> {
Expand Down
Loading