diff --git a/src/lean_spec/subspecs/networking/__init__.py b/src/lean_spec/subspecs/networking/__init__.py index dcd3024a..37a5d10e 100644 --- a/src/lean_spec/subspecs/networking/__init__.py +++ b/src/lean_spec/subspecs/networking/__init__.py @@ -26,7 +26,6 @@ GossipAttestationEvent, GossipBlockEvent, NetworkEvent, - NetworkEventSource, NetworkService, PeerConnectedEvent, PeerDisconnectedEvent, @@ -63,7 +62,6 @@ "GossipAttestationEvent", "GossipBlockEvent", "NetworkEvent", - "NetworkEventSource", "NetworkService", "PeerConnectedEvent", "PeerDisconnectedEvent", diff --git a/src/lean_spec/subspecs/networking/client/__init__.py b/src/lean_spec/subspecs/networking/client/__init__.py index 62b384b7..e2e71016 100644 --- a/src/lean_spec/subspecs/networking/client/__init__.py +++ b/src/lean_spec/subspecs/networking/client/__init__.py @@ -1,8 +1,7 @@ """ Network Client Module. -Bridges the transport layer to the sync service by implementing -the NetworkRequester and NetworkEventSource protocols. +Bridges the transport layer to the sync service. Components ---------- @@ -11,7 +10,6 @@ Handles BlocksByRoot and Status requests. LiveNetworkEventSource - Implements NetworkEventSource. Bridges connection events to NetworkService events. """ diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index e70ac404..3a6e300f 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -1,9 +1,9 @@ """ Network event source bridging transport to sync service. -This module implements NetworkEventSource, producing events from real -network connections. It bridges the gap between the low-level transport -layer (QUIC ConnectionManager) and the high-level sync service. +This module produces events from real network connections. It bridges the +gap between the low-level transport layer (QUIC ConnectionManager) and the +high-level sync service. WHY THIS MODULE EXISTS @@ -125,8 +125,8 @@ from lean_spec.subspecs.networking.reqresp.handler import ( REQRESP_PROTOCOL_IDS, BlockLookup, - DefaultRequestHandler, ReqRespServer, + RequestHandler, ) from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( @@ -551,7 +551,6 @@ class LiveNetworkEventSource: """ Produces NetworkEvent objects from real network connections. - Implements the NetworkEventSource protocol for use with NetworkService. Bridges the transport layer (ConnectionManager) to the event-driven sync layer. @@ -658,7 +657,7 @@ class LiveNetworkEventSource: Tracked for cleanup on shutdown. Tasks remove themselves on completion. """ - _reqresp_handler: DefaultRequestHandler = field(init=False) + _reqresp_handler: RequestHandler = field(init=False) """Handler for inbound ReqResp requests. Provides chain data to peers requesting Status or BlocksByRoot. @@ -683,7 +682,7 @@ class LiveNetworkEventSource: def __post_init__(self) -> None: """Initialize handlers with current configuration.""" object.__setattr__(self, "_gossip_handler", GossipHandler(fork_digest=self._fork_digest)) - object.__setattr__(self, "_reqresp_handler", DefaultRequestHandler()) + object.__setattr__(self, "_reqresp_handler", RequestHandler()) object.__setattr__(self, "_reqresp_server", ReqRespServer(handler=self._reqresp_handler)) object.__setattr__( self, "_gossipsub_behavior", GossipsubBehavior(params=GossipsubParameters()) diff --git a/src/lean_spec/subspecs/networking/reqresp/__init__.py b/src/lean_spec/subspecs/networking/reqresp/__init__.py index b51d92a3..d1169eab 100644 --- a/src/lean_spec/subspecs/networking/reqresp/__init__.py +++ b/src/lean_spec/subspecs/networking/reqresp/__init__.py @@ -9,10 +9,8 @@ from .handler import ( REQRESP_PROTOCOL_IDS, BlockLookup, - DefaultRequestHandler, ReqRespServer, RequestHandler, - ResponseStream, StreamResponseAdapter, ) from .message import ( @@ -39,9 +37,7 @@ "decode_request", # Inbound handlers "BlockLookup", - "DefaultRequestHandler", "RequestHandler", "ReqRespServer", - "ResponseStream", "StreamResponseAdapter", ] diff --git a/src/lean_spec/subspecs/networking/reqresp/handler.py b/src/lean_spec/subspecs/networking/reqresp/handler.py index d92a7cfd..49da15d4 100644 --- a/src/lean_spec/subspecs/networking/reqresp/handler.py +++ b/src/lean_spec/subspecs/networking/reqresp/handler.py @@ -20,19 +20,6 @@ Keeping them separate makes each flow easier to understand and test. -WHY HANDLERS USE ResponseStream ABSTRACTION -------------------------------------------- -Handlers receive a ResponseStream instead of a raw transport stream. -This design provides three benefits: - -1. Testability: Unit tests provide mock streams without network I/O. -2. Flexibility: Different transports work with the same handlers. -3. Clarity: Handlers focus on protocol logic, not wire format encoding. - -The ResponseStream translates high-level operations (send success, send error) into the -wire format defined in codec.py. - - WIRE FORMAT ----------- All responses use the same wire format from codec.py: @@ -73,10 +60,8 @@ from __future__ import annotations import logging -from abc import ABC, abstractmethod from collections.abc import Awaitable, Callable from dataclasses import dataclass -from typing import Protocol from lean_spec.snappy import SnappyDecompressionError, frame_decompress from lean_spec.subspecs.containers import SignedBlockWithAttestation @@ -96,55 +81,9 @@ logger = logging.getLogger(__name__) -class ResponseStream(Protocol): - """ - Protocol for sending chunked responses to peers. - - Abstracts the underlying stream transport, allowing handlers to send - responses without knowing the wire format details. - - Response Types - -------------- - - Success: Contains SSZ-encoded response data. - - Error: Contains UTF-8 error message. - - Both types are encoded using the same wire format from codec.py. - """ - - async def send_success(self, ssz_data: bytes) -> None: - """ - Send a SUCCESS response chunk. - - Args: - ssz_data: SSZ-encoded response payload. - """ - ... - - async def send_error(self, code: ResponseCode, message: str) -> None: - """ - Send an error response and close the stream. - - Args: - code: Error code (INVALID_REQUEST, SERVER_ERROR, RESOURCE_UNAVAILABLE). - message: Human-readable error description. - """ - ... - - async def finish(self) -> None: - """ - Signal end of response stream. - - Called after all response chunks have been sent. - Closes the stream gracefully. - """ - ... - - @dataclass(slots=True) class StreamResponseAdapter: - """Adapts a transport Stream to the ResponseStream protocol. - - Encodes responses using the wire format from codec.py and writes + """Encodes responses using the wire format from codec.py and writes them to the underlying stream. """ @@ -175,61 +114,6 @@ async def finish(self) -> None: await self._stream.close() -class RequestHandler(ABC): - """ - Abstract base for request handlers. - - Implementations provide the logic for responding to specific request types. - The sync service or network layer implements this to provide chain data. - - - HANDLER CONTRACT - ---------------- - Handlers MUST: - - - Send at least one response (success or error) via ResponseStream. - - Not raise exceptions (errors should be sent as error responses). - - Be idempotent (same request may arrive multiple times). - - - CONCURRENCY - ----------- - Handlers may be called concurrently for different requests. - Implementations should be thread-safe if accessing shared state. - """ - - @abstractmethod - async def handle_status(self, request: Status, response: ResponseStream) -> None: - """ - Handle incoming Status request. - - The handler should respond with our current chain status. - - Args: - request: Peer's status message. - response: Stream for sending our status response. - """ - ... - - @abstractmethod - async def handle_blocks_by_root( - self, - request: BlocksByRootRequest, - response: ResponseStream, - ) -> None: - """ - Handle incoming BlocksByRoot request. - - The handler should send each requested block as a separate response chunk. - Blocks we do not have should be skipped (or RESOURCE_UNAVAILABLE sent). - - Args: - request: List of block roots being requested. - response: Stream for sending block responses. - """ - ... - - BlockLookup = Callable[[Bytes32], Awaitable[SignedBlockWithAttestation | None]] """Type alias for block lookup function. @@ -238,12 +122,11 @@ async def handle_blocks_by_root( @dataclass(slots=True) -class DefaultRequestHandler(RequestHandler): +class RequestHandler: """ - Default request handler implementation. + Request handler for inbound peer requests. Uses callbacks to retrieve chain data. - Suitable for use with NetworkEventSource. STATUS HANDLING @@ -265,7 +148,7 @@ class DefaultRequestHandler(RequestHandler): block_lookup: BlockLookup | None = None """Callback to look up blocks by root.""" - async def handle_status(self, request: Status, response: ResponseStream) -> None: + async def handle_status(self, request: Status, response: StreamResponseAdapter) -> None: """ Handle incoming Status request. @@ -299,7 +182,7 @@ async def handle_status(self, request: Status, response: ResponseStream) -> None async def handle_blocks_by_root( self, request: BlocksByRootRequest, - response: ResponseStream, + response: StreamResponseAdapter, ) -> None: """ Handle incoming BlocksByRoot request. @@ -373,7 +256,7 @@ class ReqRespServer: 2. Decode the request (remove length prefix, decompress Snappy). 3. Deserialize SSZ bytes to the appropriate type. 4. Dispatch to handler. - 5. Handler sends response(s) via ResponseStream. + 5. Handler sends response(s) via StreamResponseAdapter. 6. Close stream. @@ -517,7 +400,7 @@ async def _dispatch( self, protocol_id: str, ssz_bytes: bytes, - response: ResponseStream, + response: StreamResponseAdapter, ) -> None: """ Dispatch a request to the appropriate handler. diff --git a/src/lean_spec/subspecs/networking/service/__init__.py b/src/lean_spec/subspecs/networking/service/__init__.py index a42be8c5..d04c0194 100644 --- a/src/lean_spec/subspecs/networking/service/__init__.py +++ b/src/lean_spec/subspecs/networking/service/__init__.py @@ -8,7 +8,6 @@ GossipAttestationEvent, GossipBlockEvent, NetworkEvent, - NetworkEventSource, PeerConnectedEvent, PeerDisconnectedEvent, PeerStatusEvent, @@ -18,8 +17,6 @@ __all__ = [ # Service "NetworkService", - # Protocol - "NetworkEventSource", # Events "GossipAttestationEvent", "GossipBlockEvent", diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 91b6334b..4a18eeb6 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -1,17 +1,16 @@ """ -Network Event Types and Source Protocol. +Network Event Types. -This module defines the event types that flow from the network layer to the -sync service, plus the abstract protocol that event sources must implement. +Event types that flow from the network layer to the sync service. Event Flow ---------- -The network layer (libp2p or test mock) produces events as an async stream. +The network layer produces events as an async stream. The network service consumes these events and routes them to sync handlers. :: - Event Source (async iterator) + LiveNetworkEventSource (async iterator) | Network Service (pattern matching dispatch) | @@ -23,7 +22,6 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Protocol, runtime_checkable from lean_spec.subspecs.containers import SignedBlockWithAttestation from lean_spec.subspecs.containers.attestation import SignedAttestation @@ -120,57 +118,3 @@ class PeerDisconnectedEvent: | PeerDisconnectedEvent ) """Union of all network event types for pattern matching dispatch.""" - - -@runtime_checkable -class NetworkEventSource(Protocol): - """ - Abstract source of network events. - - This protocol defines the interface that network implementations must - provide. It is an async iterator that yields NetworkEvent objects and - supports publishing outbound messages. - - Any class that implements async iteration over NetworkEvent can serve - as a source. - - Usage - ----- - :: - - async for event in event_source: - await handle_event(event) - - The source controls backpressure. When the consumer is slow, the - source naturally pauses due to async iteration semantics. - """ - - def __aiter__(self) -> NetworkEventSource: - """Return self as async iterator.""" - ... - - async def __anext__(self) -> NetworkEvent: - """ - Yield the next network event. - - Blocks until an event is available. - - Returns: - Next event from the network. - - Raises: - StopAsyncIteration: When no more events will arrive. - """ - ... - - async def publish(self, topic: str, data: bytes) -> None: - """ - Publish a message to all connected peers on a topic. - - Used to broadcast locally-produced blocks and attestations. - - Args: - topic: Gossip topic string. - data: Message bytes to publish. - """ - ... diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 193c64f1..57f8fb1a 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -29,6 +29,7 @@ from lean_spec.snappy import frame_compress from lean_spec.subspecs.containers import SignedBlockWithAttestation from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.peer.info import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState @@ -37,7 +38,6 @@ GossipAttestationEvent, GossipBlockEvent, NetworkEvent, - NetworkEventSource, PeerConnectedEvent, PeerDisconnectedEvent, PeerStatusEvent, @@ -69,8 +69,8 @@ class NetworkService: sync_service: SyncService """Sync service that receives routed events.""" - event_source: NetworkEventSource - """Source of network events (libp2p wrapper or test mock).""" + event_source: LiveNetworkEventSource + """Source of network events from the transport layer.""" fork_digest: str = field(default="0x00000000") """Fork digest for gossip topics (4-byte hex string).""" diff --git a/src/lean_spec/subspecs/networking/transport/__init__.py b/src/lean_spec/subspecs/networking/transport/__init__.py index 26e43ab7..15b08ab3 100644 --- a/src/lean_spec/subspecs/networking/transport/__init__.py +++ b/src/lean_spec/subspecs/networking/transport/__init__.py @@ -12,7 +12,7 @@ Components: - quic/: QUIC transport with libp2p-tls authentication - multistream/: Protocol negotiation - - connection/: Connection abstractions (re-exports QUIC types) + - connection/: Stream protocol and re-exports of QUIC types - identity/: secp256k1 keypairs and identity proofs QUIC provides encryption and multiplexing natively, eliminating the need @@ -24,7 +24,7 @@ - libp2p/specs quic, tls, multistream-select """ -from .connection import Connection, ConnectionManager, Stream +from .connection import ConnectionManager, Stream from .identity import ( NOISE_IDENTITY_PREFIX, IdentityKeypair, @@ -44,7 +44,6 @@ __all__ = [ # Connection management - "Connection", "Stream", "ConnectionManager", # QUIC transport diff --git a/src/lean_spec/subspecs/networking/transport/connection/__init__.py b/src/lean_spec/subspecs/networking/transport/connection/__init__.py index f36067e3..bb3a064c 100644 --- a/src/lean_spec/subspecs/networking/transport/connection/__init__.py +++ b/src/lean_spec/subspecs/networking/transport/connection/__init__.py @@ -6,17 +6,16 @@ eliminating the need for separate encryption and multiplexing layers. Exports: - - Connection, Stream: Protocol classes for type annotations + - Stream: Protocol class for type annotations - ConnectionManager: QuicConnectionManager for actual use - QuicConnection, QuicStream: Concrete implementations """ from ..quic.connection import QuicConnection, QuicStream from ..quic.connection import QuicConnectionManager as ConnectionManager -from .types import Connection, Stream +from .types import Stream __all__ = [ - "Connection", "Stream", "ConnectionManager", "QuicConnection", diff --git a/src/lean_spec/subspecs/networking/transport/connection/types.py b/src/lean_spec/subspecs/networking/transport/connection/types.py index 2bd2a651..fcd02f12 100644 --- a/src/lean_spec/subspecs/networking/transport/connection/types.py +++ b/src/lean_spec/subspecs/networking/transport/connection/types.py @@ -92,76 +92,3 @@ async def reset(self) -> None: cases where graceful close isn't needed. """ ... - - -@runtime_checkable -class Connection(Protocol): - """ - A secure, multiplexed connection to a peer. - - Connections wrap the QUIC transport stack. Once established, streams - can be opened for different protocols. - - Example usage: - connection = await transport.connect("/ip4/127.0.0.1/udp/9000/quic-v1") - stream = await connection.open_stream("/leanconsensus/req/status/1/ssz_snappy") - # ... use stream ... - await connection.close() - """ - - @property - def peer_id(self) -> str: - """ - Remote peer's ID. - - Derived from their public key during TLS handshake. - Format: Base58-encoded multihash (e.g., "12D3KooW...") - """ - ... - - @property - def remote_addr(self) -> str: - """ - Remote address in multiaddr format. - - Example: "/ip4/192.168.1.1/udp/9000/quic-v1" - """ - ... - - async def open_stream(self, protocol: str) -> Stream: - """ - Open a new stream for the given protocol. - - Performs multistream-select negotiation before returning. - - Args: - protocol: Protocol ID to negotiate (e.g., "/leanconsensus/req/status/1/ssz_snappy") - - Returns: - Open stream ready for read/write. - - Raises: - NegotiationError: If protocol not supported by peer. - ConnectionError: If connection has failed. - """ - ... - - async def accept_stream(self) -> Stream: - """ - Accept an incoming stream from the peer. - - Blocks until a new stream is opened by the remote side. - - Returns: - New stream opened by peer. - """ - ... - - async def close(self) -> None: - """ - Close the connection gracefully. - - All streams are closed and the underlying QUIC connection - is terminated. - """ - ... diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index c00c9695..55bc09c7 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -28,7 +28,8 @@ from lean_spec.subspecs.containers.state import Validators from lean_spec.subspecs.containers.validator import ValidatorIndex from lean_spec.subspecs.forkchoice import Store -from lean_spec.subspecs.networking import NetworkEventSource, NetworkService +from lean_spec.subspecs.networking import NetworkService +from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource from lean_spec.subspecs.ssz.hash import hash_tree_root from lean_spec.subspecs.storage import Database, SQLiteDatabase from lean_spec.subspecs.sync import BlockCache, NetworkRequester, PeerManager, SyncService @@ -50,7 +51,7 @@ class NodeConfig: validators: Validators """Initial validator set for genesis state.""" - event_source: NetworkEventSource + event_source: LiveNetworkEventSource """Source of network events.""" network: NetworkRequester diff --git a/tests/lean_spec/subspecs/networking/reqresp/test_handler.py b/tests/lean_spec/subspecs/networking/reqresp/test_handler.py index 95c38433..7eefbc95 100644 --- a/tests/lean_spec/subspecs/networking/reqresp/test_handler.py +++ b/tests/lean_spec/subspecs/networking/reqresp/test_handler.py @@ -14,8 +14,8 @@ ) from lean_spec.subspecs.networking.reqresp.handler import ( REQRESP_PROTOCOL_IDS, - DefaultRequestHandler, ReqRespServer, + RequestHandler, StreamResponseAdapter, ) from lean_spec.subspecs.networking.reqresp.message import ( @@ -190,13 +190,13 @@ async def test_finish_closes_stream(self) -> None: assert stream.closed is True -class TestDefaultRequestHandlerStatus: - """Tests for DefaultRequestHandler.handle_status.""" +class TestRequestHandlerStatus: + """Tests for RequestHandler.handle_status.""" async def test_handle_status_returns_our_status(self) -> None: """Returns our configured status on valid request.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) response = MockResponseStream() peer_status = Status( @@ -204,7 +204,7 @@ async def test_handle_status_returns_our_status(self) -> None: head=Checkpoint(root=Bytes32(b"\xbb" * 32), slot=Slot(150)), ) - await handler.handle_status(peer_status, response) + await handler.handle_status(peer_status, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 1 @@ -216,11 +216,11 @@ async def test_handle_status_returns_our_status(self) -> None: async def test_handle_status_no_status_returns_error(self) -> None: """Returns SERVER_ERROR when no status is configured.""" - handler = DefaultRequestHandler() # No our_status set + handler = RequestHandler() # No our_status set response = MockResponseStream() peer_status = make_test_status() - await handler.handle_status(peer_status, response) + await handler.handle_status(peer_status, response) # type: ignore[arg-type] assert len(response.successes) == 0 assert len(response.errors) == 1 @@ -230,7 +230,7 @@ async def test_handle_status_no_status_returns_error(self) -> None: async def test_handle_status_ignores_peer_status(self) -> None: """Peer's status does not affect our response.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) response = MockResponseStream() # Peer claims different chain state @@ -239,7 +239,7 @@ async def test_handle_status_ignores_peer_status(self) -> None: head=Checkpoint(root=Bytes32(b"\xee" * 32), slot=Slot(10000)), ) - await handler.handle_status(peer_status, response) + await handler.handle_status(peer_status, response) # type: ignore[arg-type] # Our response is independent of peer's status returned_status = Status.decode_bytes(response.successes[0]) @@ -247,8 +247,8 @@ async def test_handle_status_ignores_peer_status(self) -> None: assert returned_status.finalized.slot == Slot(100) -class TestDefaultRequestHandlerBlocksByRoot: - """Tests for DefaultRequestHandler.handle_blocks_by_root.""" +class TestRequestHandlerBlocksByRoot: + """Tests for RequestHandler.handle_blocks_by_root.""" async def test_handle_blocks_by_root_returns_found_blocks(self) -> None: """Sends SUCCESS response for each found block.""" @@ -264,14 +264,14 @@ async def test_handle_blocks_by_root_returns_found_blocks(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return block_roots.get(bytes(root)) - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() request = BlocksByRootRequest( roots=RequestedBlockRoots(data=[Bytes32(b"\x11" * 32), Bytes32(b"\x22" * 32)]) ) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 2 @@ -293,7 +293,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return block1 return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() # Request two blocks, only one exists @@ -306,7 +306,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: ) ) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] # Only one block returned, no errors assert len(response.errors) == 0 @@ -314,12 +314,12 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: async def test_handle_blocks_by_root_no_lookup_returns_error(self) -> None: """Returns SERVER_ERROR when no lookup callback is configured.""" - handler = DefaultRequestHandler() # No block_lookup set + handler = RequestHandler() # No block_lookup set response = MockResponseStream() request = BlocksByRootRequest(roots=RequestedBlockRoots(data=[Bytes32(b"\x11" * 32)])) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.successes) == 0 assert len(response.errors) == 1 @@ -332,12 +332,12 @@ async def test_handle_blocks_by_root_empty_request(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() request = BlocksByRootRequest(roots=RequestedBlockRoots(data=[])) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 0 @@ -353,7 +353,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return block2 return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() # First block causes error, second succeeds @@ -361,7 +361,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: roots=RequestedBlockRoots(data=[Bytes32(b"\x11" * 32), Bytes32(b"\x22" * 32)]) ) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] # Second block still returned despite first lookup failing assert len(response.errors) == 0 @@ -377,7 +377,7 @@ class TestReqRespServer: async def test_handle_status_request(self) -> None: """Full Status request/response flow through ReqRespServer.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) server = ReqRespServer(handler=handler) # Build wire-format request @@ -414,7 +414,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return block1 return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) server = ReqRespServer(handler=handler) # Build wire-format request @@ -436,7 +436,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: async def test_empty_request_returns_error(self) -> None: """Empty request data returns INVALID_REQUEST error.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) stream = MockStream(request_data=b"") @@ -452,7 +452,7 @@ async def test_empty_request_returns_error(self) -> None: async def test_decode_error_returns_invalid_request(self) -> None: """Malformed wire data returns INVALID_REQUEST error.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) # Invalid snappy data after length prefix @@ -469,7 +469,7 @@ async def test_decode_error_returns_invalid_request(self) -> None: async def test_invalid_ssz_returns_invalid_request(self) -> None: """Valid wire format but invalid SSZ returns INVALID_REQUEST.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) # Valid wire format but SSZ is too short for Status (needs 80 bytes) @@ -488,7 +488,7 @@ async def test_invalid_ssz_returns_invalid_request(self) -> None: async def test_unknown_protocol_returns_error(self) -> None: """Unknown protocol ID returns SERVER_ERROR.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) # Valid request data but unknown protocol @@ -508,7 +508,7 @@ async def test_unknown_protocol_returns_error(self) -> None: async def test_stream_closed_on_completion(self) -> None: """Stream is always closed after handling, even on success.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) status = make_test_status() @@ -521,7 +521,7 @@ async def test_stream_closed_on_completion(self) -> None: async def test_stream_closed_on_error(self) -> None: """Stream is closed even when handling fails.""" - handler = DefaultRequestHandler() # No status configured + handler = RequestHandler() # No status configured server = ReqRespServer(handler=handler) status = make_test_status() @@ -567,7 +567,7 @@ class TestIntegration: async def test_roundtrip_status_request(self) -> None: """Full encode -> server -> decode roundtrip for Status.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) server = ReqRespServer(handler=handler) # Client side: encode request @@ -608,7 +608,7 @@ async def test_roundtrip_blocks_by_root_request(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return blocks_by_root.get(bytes(root)) - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) server = ReqRespServer(handler=handler) # Client side: encode request @@ -643,7 +643,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return block1 return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) server = ReqRespServer(handler=handler) # Request two blocks, only one exists @@ -770,7 +770,7 @@ class TestReqRespServerChunkedRead: async def test_handle_chunked_status_request(self) -> None: """Request data arriving in multiple chunks is assembled correctly.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) server = ReqRespServer(handler=handler) # Build wire-format request @@ -794,7 +794,7 @@ async def test_handle_chunked_status_request(self) -> None: async def test_handle_single_byte_chunks(self) -> None: """Request data arriving one byte at a time is handled.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) server = ReqRespServer(handler=handler) peer_status = make_test_status() @@ -823,7 +823,7 @@ async def test_invalid_blocks_by_root_ssz(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) server = ReqRespServer(handler=handler) # Valid wire format but wrong SSZ structure for BlocksByRootRequest @@ -842,7 +842,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: async def test_truncated_varint_returns_error(self) -> None: """Truncated varint in request returns INVALID_REQUEST.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) # Varint with continuation bit set but no following byte @@ -858,8 +858,8 @@ async def test_truncated_varint_returns_error(self) -> None: assert code == ResponseCode.INVALID_REQUEST -class TestDefaultRequestHandlerEdgeCases: - """Edge cases for DefaultRequestHandler.""" +class TestRequestHandlerEdgeCases: + """Edge cases for RequestHandler.""" async def test_blocks_by_root_single_block(self) -> None: """Single block request returns correctly.""" @@ -871,12 +871,12 @@ async def lookup(r: Bytes32) -> SignedBlockWithAttestation | None: return block return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() request = BlocksByRootRequest(roots=RequestedBlockRoots(data=[root])) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 1 @@ -890,7 +890,7 @@ async def test_blocks_by_root_all_missing(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return None - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() request = BlocksByRootRequest( @@ -903,7 +903,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: ) ) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 0 @@ -922,7 +922,7 @@ async def test_blocks_by_root_mixed_found_missing(self) -> None: async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: return blocks.get(bytes(root)) - handler = DefaultRequestHandler(block_lookup=lookup) + handler = RequestHandler(block_lookup=lookup) response = MockResponseStream() request = BlocksByRootRequest( @@ -935,7 +935,7 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: ) ) - await handler.handle_blocks_by_root(request, response) + await handler.handle_blocks_by_root(request, response) # type: ignore[arg-type] assert len(response.errors) == 0 assert len(response.successes) == 2 @@ -949,17 +949,17 @@ async def lookup(root: Bytes32) -> SignedBlockWithAttestation | None: async def test_status_update_after_initialization(self) -> None: """Status can be updated after handler creation.""" - handler = DefaultRequestHandler() + handler = RequestHandler() response1 = MockResponseStream() # First request with no status - await handler.handle_status(make_test_status(), response1) + await handler.handle_status(make_test_status(), response1) # type: ignore[arg-type] # Update status handler.our_status = make_test_status() response2 = MockResponseStream() - await handler.handle_status(make_test_status(), response2) + await handler.handle_status(make_test_status(), response2) # type: ignore[arg-type] # First request should fail assert len(response1.successes) == 0 @@ -974,7 +974,7 @@ class TestConcurrentRequestHandling: async def test_concurrent_status_requests(self) -> None: """Multiple concurrent status requests are handled independently.""" our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status) + handler = RequestHandler(our_status=our_status) server = ReqRespServer(handler=handler) # Create multiple streams with requests @@ -1015,7 +1015,7 @@ async def lookup(r: Bytes32) -> SignedBlockWithAttestation | None: return None our_status = make_test_status() - handler = DefaultRequestHandler(our_status=our_status, block_lookup=lookup) + handler = RequestHandler(our_status=our_status, block_lookup=lookup) server = ReqRespServer(handler=handler) # Status request @@ -1108,7 +1108,7 @@ class TestHandlerExceptionRecovery: async def test_stream_closed_despite_close_exception(self) -> None: """Stream close is attempted even if it raises an exception.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) request_bytes = encode_request(make_test_status().encode_bytes()) @@ -1125,7 +1125,7 @@ async def test_stream_closed_despite_close_exception(self) -> None: async def test_error_response_sent_despite_write_exception(self) -> None: """Error handling continues even when write fails.""" - handler = DefaultRequestHandler() # No status + handler = RequestHandler() # No status server = ReqRespServer(handler=handler) request_bytes = encode_request(make_test_status().encode_bytes()) @@ -1176,7 +1176,7 @@ class TestReadRequestBufferLimit: async def test_read_request_rejects_oversized_compressed_data(self) -> None: """Unbounded compressed data stream is rejected.""" - handler = DefaultRequestHandler(our_status=make_test_status()) + handler = RequestHandler(our_status=make_test_status()) server = ReqRespServer(handler=handler) # Send a small varint claiming length 10, then flood with garbage data diff --git a/tests/lean_spec/subspecs/networking/test_network_service.py b/tests/lean_spec/subspecs/networking/test_network_service.py index 9e657f4d..6f8f9cb1 100644 --- a/tests/lean_spec/subspecs/networking/test_network_service.py +++ b/tests/lean_spec/subspecs/networking/test_network_service.py @@ -78,7 +78,7 @@ async def test_block_added_to_store_blocks_dict( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -113,7 +113,7 @@ async def test_store_head_updated_after_block( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -147,7 +147,7 @@ async def test_block_ignored_in_idle_state_store_unchanged( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -194,7 +194,7 @@ async def test_attestation_processed_by_store( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -237,7 +237,7 @@ async def test_attestation_ignored_in_idle_state( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -270,7 +270,7 @@ async def test_peer_status_triggers_idle_to_syncing( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -296,7 +296,7 @@ async def test_peer_status_updates_peer_manager( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -351,7 +351,7 @@ async def test_full_sync_flow_status_then_block( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -397,7 +397,7 @@ async def test_block_before_status_is_ignored( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() @@ -442,7 +442,7 @@ async def test_multiple_blocks_chain_extension( source = MockEventSource(events=events) network_service = NetworkService( sync_service=sync_service, - event_source=source, + event_source=source, # type: ignore[arg-type] ) await network_service.run() diff --git a/tests/lean_spec/subspecs/node/test_node.py b/tests/lean_spec/subspecs/node/test_node.py index b743f05a..6720f0cc 100644 --- a/tests/lean_spec/subspecs/node/test_node.py +++ b/tests/lean_spec/subspecs/node/test_node.py @@ -37,7 +37,7 @@ def node_config() -> NodeConfig: return NodeConfig( genesis_time=Uint64(1704067200), validators=make_validators(3), - event_source=MockEventSource(), + event_source=MockEventSource(), # type: ignore[arg-type] network=MockNetworkRequester(), time_fn=lambda: 1704067200.0, )