diff --git a/crates/core/src/contract/handler.rs b/crates/core/src/contract/handler.rs index c8bbe49fe..9f76aea93 100644 --- a/crates/core/src/contract/handler.rs +++ b/crates/core/src/contract/handler.rs @@ -664,6 +664,85 @@ pub mod test { Ok(()) } + + /// Regression test for issue #2278: Verifies that send_to_sender returns + /// an error when the response receiver is dropped, but does NOT crash or + /// break the channel. + /// + /// This tests that the channel infrastructure supports the fix in + /// contract/mod.rs where we changed `send_to_sender()?` to non-propagating + /// error handling, so the handler loop can continue even when a response + /// can't be delivered. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn send_to_sender_fails_gracefully_when_receiver_dropped() -> anyhow::Result<()> { + let (send_halve, mut rcv_halve, _) = contract_handler_channel(); + + let contract = ContractContainer::Wasm(ContractWasmAPIVersion::V1(WrappedContract::new( + Arc::new(ContractCode::from(vec![0, 1, 2, 3])), + Parameters::from(vec![4, 5]), + ))); + let key = contract.key(); + + // Send a request + let h = GlobalExecutor::spawn({ + async move { + send_halve + .send_to_handler(ContractHandlerEvent::PutQuery { + key, + state: vec![6, 7, 8].into(), + related_contracts: RelatedContracts::default(), + contract: None, + }) + .await + } + }); + + // Receive the event from the handler side + let (id, ev) = + tokio::time::timeout(Duration::from_millis(100), rcv_halve.recv_from_sender()) + .await??; + + // Verify it's a PutQuery + let ContractHandlerEvent::PutQuery { state, .. } = ev else { + anyhow::bail!("expected PutQuery event"); + }; + assert_eq!(state.as_ref(), &[6, 7, 8]); + + // Abort the request handler task - this drops the oneshot receiver + // simulating a client disconnect + h.abort(); + // Wait a bit for the abort to take effect + tokio::time::sleep(Duration::from_millis(10)).await; + + // Try to send the response - this should fail because the receiver was dropped + // when we aborted the task. In the actual contract_handling loop (after the fix), + // this would just log and continue rather than propagating the error. + let send_result = rcv_halve + .send_to_sender( + id, + ContractHandlerEvent::PutResponse { + new_value: Ok(vec![0, 7].into()), + }, + ) + .await; + + // send_to_sender should fail because receiver was dropped + assert!( + send_result.is_err(), + "send_to_sender should fail when receiver is dropped, got {:?}", + send_result + ); + + // Verify the error is the expected NoEvHandlerResponse + // Use super:: to disambiguate from freenet_stdlib::prelude::ContractError + assert!( + matches!(send_result, Err(super::ContractError::NoEvHandlerResponse)), + "Expected NoEvHandlerResponse error, got {:?}", + send_result + ); + + Ok(()) + } } pub(super) mod in_memory { diff --git a/crates/core/src/contract/mod.rs b/crates/core/src/contract/mod.rs index 7be7c8eff..3fbb6cfef 100644 --- a/crates/core/src/contract/mod.rs +++ b/crates/core/src/contract/mod.rs @@ -55,7 +55,8 @@ where phase = "get_complete", "Fetched contract" ); - contract_handler + // Send response back to caller. If the caller disconnected, just log and continue. + if let Err(error) = contract_handler .channel() .send_to_sender( id, @@ -65,13 +66,13 @@ where }, ) .await - .map_err(|error| { - tracing::debug!( - error = %error, - "Shutting down contract handler" - ); - error - })?; + { + tracing::debug!( + error = %error, + contract = %key, + "Failed to send GET response (client may have disconnected)" + ); + } } Err(err) => { tracing::warn!( @@ -89,7 +90,8 @@ where ); return Err(ContractError::FatalExecutorError { key, error: err }); } - contract_handler + // Send error response back to caller. If the caller disconnected, just log and continue. + if let Err(error) = contract_handler .channel() .send_to_sender( id, @@ -99,13 +101,13 @@ where }, ) .await - .map_err(|error| { - tracing::debug!( - error = %error, - "Shutting down contract handler" - ); - error - })?; + { + tracing::debug!( + error = %error, + contract = %key, + "Failed to send GET error response (client may have disconnected)" + ); + } } } } @@ -168,17 +170,20 @@ where } }; - contract_handler + // Send response back to caller. If the caller disconnected (e.g., WebSocket closed), + // the response channel may be dropped. This is not fatal - the contract has already + // been stored, so we just log and continue processing other events. + if let Err(error) = contract_handler .channel() .send_to_sender(id, event_result) .await - .map_err(|error| { - tracing::debug!( - error = %error, - "Shutting down contract handler" - ); - error - })?; + { + tracing::debug!( + error = %error, + contract = %key, + "Failed to send PUT response (client may have disconnected)" + ); + } } ContractHandlerEvent::UpdateQuery { key, @@ -258,17 +263,19 @@ where } }; - contract_handler + // Send response back to caller. If the caller disconnected, the response channel + // may be dropped. This is not fatal - the update has already been applied. + if let Err(error) = contract_handler .channel() .send_to_sender(id, event_result) .await - .map_err(|error| { - tracing::debug!( - error = %error, - "Shutting down contract handler" - ); - error - })?; + { + tracing::debug!( + error = %error, + contract = %key, + "Failed to send UPDATE response (client may have disconnected)" + ); + } } ContractHandlerEvent::DelegateRequest { req, @@ -309,17 +316,19 @@ where } }; - contract_handler + // Send response back to caller. If the caller disconnected, the response channel + // may be dropped. This is not fatal - the delegate has already been processed. + if let Err(error) = contract_handler .channel() .send_to_sender(id, ContractHandlerEvent::DelegateResponse(response)) .await - .map_err(|error| { - tracing::debug!( - error = %error, - "Shutting down contract handler" - ); - error - })?; + { + tracing::debug!( + error = %error, + delegate_key = %delegate_key, + "Failed to send DELEGATE response (client may have disconnected)" + ); + } } ContractHandlerEvent::RegisterSubscriberListener { key, @@ -340,14 +349,19 @@ where ); }); - // FIXME: if there is an error senc actually an error back - contract_handler + // FIXME: if there is an error send actually an error back + // If the caller disconnected, just log and continue. + if let Err(error) = contract_handler .channel() .send_to_sender(id, ContractHandlerEvent::RegisterSubscriberListenerResponse) .await - .inspect_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - })?; + { + tracing::debug!( + error = %error, + contract = %key, + "Failed to send RegisterSubscriberListener response (client may have disconnected)" + ); + } } ContractHandlerEvent::QuerySubscriptions { callback } => { // Get subscription information from the executor and send it through the callback @@ -362,13 +376,17 @@ where .send(crate::message::QueryResult::NetworkDebug(network_debug)) .await; - contract_handler + // If the caller disconnected, just log and continue. + if let Err(error) = contract_handler .channel() .send_to_sender(id, ContractHandlerEvent::QuerySubscriptionsResponse) .await - .inspect_err(|error| { - tracing::debug!(%error, "shutting down contract handler"); - })?; + { + tracing::debug!( + error = %error, + "Failed to send QuerySubscriptions response (client may have disconnected)" + ); + } } _ => unreachable!("ContractHandlerEvent enum should be exhaustive here"), } diff --git a/crates/fdev/src/commands.rs b/crates/fdev/src/commands.rs index cc7670edd..edaac48d8 100644 --- a/crates/fdev/src/commands.rs +++ b/crates/fdev/src/commands.rs @@ -5,7 +5,9 @@ use freenet_stdlib::prelude::{ ContractCode, ContractContainer, ContractWasmAPIVersion, Parameters, WrappedContract, }; use freenet_stdlib::{ - client_api::{ClientRequest, ContractRequest, DelegateRequest, WebApi}, + client_api::{ + ClientRequest, ContractRequest, ContractResponse, DelegateRequest, HostResponse, WebApi, + }, prelude::*, }; use xz2::read::XzDecoder; @@ -170,9 +172,26 @@ async fn put_contract( tracing::debug!("Starting WebSocket client connection"); let mut client = start_api_client(other).await?; tracing::debug!("WebSocket client connected successfully"); - let result = execute_command(request, &mut client).await; - tracing::debug!(success = ?result.is_ok(), "WebSocket client operation complete"); - result + execute_command(request, &mut client).await?; + + // Wait for server response before closing connection + let response = client + .recv() + .await + .map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?; + + match response { + HostResponse::ContractResponse(ContractResponse::PutResponse { key: response_key }) => { + tracing::info!(%response_key, "Contract published successfully"); + Ok(()) + } + HostResponse::ContractResponse(other) => { + anyhow::bail!("Unexpected contract response: {:?}", other) + } + other => { + anyhow::bail!("Unexpected response type: {:?}", other) + } + } } async fn put_delegate( @@ -204,7 +223,8 @@ For additional hardening is recommended to use a different cipher and nonce to e (cipher, nonce) }; - println!("Putting delegate {} ", delegate.key().encode()); + let delegate_key = delegate.key().clone(); + println!("Putting delegate {} ", delegate_key.encode()); let request = DelegateRequest::RegisterDelegate { delegate, cipher, @@ -212,7 +232,23 @@ For additional hardening is recommended to use a different cipher and nonce to e } .into(); let mut client = start_api_client(other).await?; - execute_command(request, &mut client).await + execute_command(request, &mut client).await?; + + // Wait for server response before closing connection + let response = client + .recv() + .await + .map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?; + + match response { + HostResponse::DelegateResponse { key, values } => { + tracing::info!(%key, response_count = values.len(), "Delegate registered successfully"); + Ok(()) + } + other => { + anyhow::bail!("Unexpected response type: {:?}", other) + } + } } #[derive(clap::Parser, Clone, Debug)] @@ -253,7 +289,7 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<( if config.release { anyhow::bail!("Cannot publish contracts in the network yet"); } - let key = ContractInstanceId::try_from(config.key)?.into(); + let key: ContractKey = ContractInstanceId::try_from(config.key)?.into(); println!("Updating contract {key}"); let data = { let mut buf = vec![]; @@ -262,7 +298,29 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<( }; let request = ContractRequest::Update { key, data }.into(); let mut client = start_api_client(other).await?; - execute_command(request, &mut client).await + execute_command(request, &mut client).await?; + + // Wait for server response before closing connection + let response = client + .recv() + .await + .map_err(|e| anyhow::anyhow!("Failed to receive response: {e}"))?; + + match response { + HostResponse::ContractResponse(ContractResponse::UpdateResponse { + key: response_key, + summary, + }) => { + tracing::info!(%response_key, ?summary, "Contract updated successfully"); + Ok(()) + } + HostResponse::ContractResponse(other) => { + anyhow::bail!("Unexpected contract response: {:?}", other) + } + other => { + anyhow::bail!("Unexpected response type: {:?}", other) + } + } } pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result { diff --git a/crates/fdev/tests/websocket_response.rs b/crates/fdev/tests/websocket_response.rs new file mode 100644 index 000000000..a8e236f16 --- /dev/null +++ b/crates/fdev/tests/websocket_response.rs @@ -0,0 +1,125 @@ +//! Integration test verifying fdev properly waits for server responses +//! +//! This test ensures that the WebSocket client doesn't close the connection +//! before receiving a response from the server (fixing issue #2278). + +use freenet_stdlib::client_api::{ + ClientRequest, ContractRequest, ContractResponse, HostResponse, WebApi, +}; +use freenet_stdlib::prelude::*; +use std::net::Ipv4Addr; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::oneshot; +use tokio_tungstenite::tungstenite::Message; + +static PORT: AtomicU16 = AtomicU16::new(54000); + +/// Test that the WebSocket client properly waits for a PutResponse +/// +/// Before the fix for #2278, fdev would close the connection before +/// receiving the response, causing "Connection reset without closing handshake" +/// errors on the server side. +#[tokio::test] +async fn test_websocket_client_waits_for_put_response() { + let port = PORT.fetch_add(1, Ordering::SeqCst); + + // Create a mock contract key for the response (base58 encoded) + let mock_key = ContractKey::from_id("11111111111111111111111111111111").expect("valid key"); + let response: HostResponse = + HostResponse::ContractResponse(ContractResponse::PutResponse { key: mock_key }); + + // Channel to signal when server is ready to accept connections + let (ready_tx, ready_rx) = oneshot::channel::<()>(); + + // Start the mock server + let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port)) + .await + .expect("bind"); + + let server_response = response.clone(); + let server_handle = tokio::spawn(async move { + // Signal that we're ready to accept connections + let _ = ready_tx.send(()); + + let (stream, _) = tokio::time::timeout(Duration::from_secs(5), listener.accept()) + .await + .expect("accept timeout") + .expect("accept"); + + let mut ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("ws accept"); + + use futures::{SinkExt, StreamExt}; + + // Receive the request + let msg = tokio::time::timeout(Duration::from_secs(5), ws_stream.next()) + .await + .expect("receive timeout") + .expect("stream not empty") + .expect("receive"); + + // Verify we received a binary message (contract requests are binary) + match msg { + Message::Binary(_) => {} // Request received successfully + _ => panic!("expected binary message"), + }; + + // Send back the response + let response_bytes = bincode::serialize(&Ok::<_, freenet_stdlib::client_api::ClientError>( + server_response, + )) + .expect("serialize"); + ws_stream + .send(Message::Binary(response_bytes.into())) + .await + .expect("send response"); + }); + + // Wait for server to be ready before connecting + ready_rx.await.expect("server ready signal"); + + // Connect client + let url = format!("ws://127.0.0.1:{port}/v1/contract/command?encodingProtocol=native"); + let (stream, _) = tokio_tungstenite::connect_async(&url) + .await + .expect("connect"); + let mut client = WebApi::start(stream); + + // Create a minimal contract for the request + let code = ContractCode::from(vec![0u8; 32]); + let wrapped = WrappedContract::new(Arc::new(code), Parameters::from(vec![])); + let api_version = ContractWasmAPIVersion::V1(wrapped); + let contract = ContractContainer::from(api_version); + + // Send a Put request (simulating what fdev does) + let request = ClientRequest::ContractOp(ContractRequest::Put { + contract, + state: WrappedState::new(vec![]), + related_contracts: RelatedContracts::default(), + subscribe: false, + }); + + client.send(request).await.expect("send request"); + + // This is the key behavior: we must receive the response before dropping the client. + // Before the fix, fdev would exit here without waiting, causing connection reset. + let response = tokio::time::timeout(Duration::from_secs(5), client.recv()) + .await + .expect("response timeout") + .expect("receive response"); + + // Verify we got the expected response + match response { + HostResponse::ContractResponse(ContractResponse::PutResponse { key }) => { + assert_eq!(key, mock_key); + } + other => panic!("unexpected response: {:?}", other), + } + + // Wait for server to complete + server_handle.await.expect("server task"); +}