Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/lean_spec/subspecs/networking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
GossipAttestationEvent,
GossipBlockEvent,
NetworkEvent,
NetworkEventSource,
NetworkService,
PeerConnectedEvent,
PeerDisconnectedEvent,
Expand Down Expand Up @@ -63,7 +62,6 @@
"GossipAttestationEvent",
"GossipBlockEvent",
"NetworkEvent",
"NetworkEventSource",
"NetworkService",
"PeerConnectedEvent",
"PeerDisconnectedEvent",
Expand Down
4 changes: 1 addition & 3 deletions src/lean_spec/subspecs/networking/client/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
"""
Network Client Module.

Bridges the transport layer to the sync service by implementing
the NetworkRequester and NetworkEventSource protocols.
Bridges the transport layer to the sync service.

Components
----------
Expand All @@ -11,7 +10,6 @@
Handles BlocksByRoot and Status requests.

LiveNetworkEventSource
Implements NetworkEventSource.
Bridges connection events to NetworkService events.
"""

Expand Down
13 changes: 6 additions & 7 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
Network event source bridging transport to sync service.

This module implements NetworkEventSource, producing events from real
network connections. It bridges the gap between the low-level transport
layer (QUIC ConnectionManager) and the high-level sync service.
This module produces events from real network connections. It bridges the
gap between the low-level transport layer (QUIC ConnectionManager) and the
high-level sync service.


WHY THIS MODULE EXISTS
Expand Down Expand Up @@ -125,8 +125,8 @@
from lean_spec.subspecs.networking.reqresp.handler import (
REQRESP_PROTOCOL_IDS,
BlockLookup,
DefaultRequestHandler,
ReqRespServer,
RequestHandler,
)
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.networking.service.events import (
Expand Down Expand Up @@ -551,7 +551,6 @@ class LiveNetworkEventSource:
"""
Produces NetworkEvent objects from real network connections.

Implements the NetworkEventSource protocol for use with NetworkService.
Bridges the transport layer (ConnectionManager) to the event-driven
sync layer.

Expand Down Expand Up @@ -658,7 +657,7 @@ class LiveNetworkEventSource:
Tracked for cleanup on shutdown. Tasks remove themselves on completion.
"""

_reqresp_handler: DefaultRequestHandler = field(init=False)
_reqresp_handler: RequestHandler = field(init=False)
"""Handler for inbound ReqResp requests.

Provides chain data to peers requesting Status or BlocksByRoot.
Expand All @@ -683,7 +682,7 @@ class LiveNetworkEventSource:
def __post_init__(self) -> None:
"""Initialize handlers with current configuration."""
object.__setattr__(self, "_gossip_handler", GossipHandler(fork_digest=self._fork_digest))
object.__setattr__(self, "_reqresp_handler", DefaultRequestHandler())
object.__setattr__(self, "_reqresp_handler", RequestHandler())
object.__setattr__(self, "_reqresp_server", ReqRespServer(handler=self._reqresp_handler))
object.__setattr__(
self, "_gossipsub_behavior", GossipsubBehavior(params=GossipsubParameters())
Expand Down
4 changes: 0 additions & 4 deletions src/lean_spec/subspecs/networking/reqresp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@
from .handler import (
REQRESP_PROTOCOL_IDS,
BlockLookup,
DefaultRequestHandler,
ReqRespServer,
RequestHandler,
ResponseStream,
StreamResponseAdapter,
)
from .message import (
Expand All @@ -39,9 +37,7 @@
"decode_request",
# Inbound handlers
"BlockLookup",
"DefaultRequestHandler",
"RequestHandler",
"ReqRespServer",
"ResponseStream",
"StreamResponseAdapter",
]
131 changes: 7 additions & 124 deletions src/lean_spec/subspecs/networking/reqresp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,6 @@
Keeping them separate makes each flow easier to understand and test.


WHY HANDLERS USE ResponseStream ABSTRACTION
-------------------------------------------
Handlers receive a ResponseStream instead of a raw transport stream.
This design provides three benefits:

1. Testability: Unit tests provide mock streams without network I/O.
2. Flexibility: Different transports work with the same handlers.
3. Clarity: Handlers focus on protocol logic, not wire format encoding.

The ResponseStream translates high-level operations (send success, send error) into the
wire format defined in codec.py.


WIRE FORMAT
-----------
All responses use the same wire format from codec.py:
Expand Down Expand Up @@ -73,10 +60,8 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Protocol

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
Expand All @@ -96,55 +81,9 @@
logger = logging.getLogger(__name__)


class ResponseStream(Protocol):
"""
Protocol for sending chunked responses to peers.

Abstracts the underlying stream transport, allowing handlers to send
responses without knowing the wire format details.

Response Types
--------------
- Success: Contains SSZ-encoded response data.
- Error: Contains UTF-8 error message.

Both types are encoded using the same wire format from codec.py.
"""

async def send_success(self, ssz_data: bytes) -> None:
"""
Send a SUCCESS response chunk.

Args:
ssz_data: SSZ-encoded response payload.
"""
...

async def send_error(self, code: ResponseCode, message: str) -> None:
"""
Send an error response and close the stream.

Args:
code: Error code (INVALID_REQUEST, SERVER_ERROR, RESOURCE_UNAVAILABLE).
message: Human-readable error description.
"""
...

async def finish(self) -> None:
"""
Signal end of response stream.

Called after all response chunks have been sent.
Closes the stream gracefully.
"""
...


@dataclass(slots=True)
class StreamResponseAdapter:
"""Adapts a transport Stream to the ResponseStream protocol.

Encodes responses using the wire format from codec.py and writes
"""Encodes responses using the wire format from codec.py and writes
them to the underlying stream.
"""

Expand Down Expand Up @@ -175,61 +114,6 @@ async def finish(self) -> None:
await self._stream.close()


class RequestHandler(ABC):
"""
Abstract base for request handlers.

Implementations provide the logic for responding to specific request types.
The sync service or network layer implements this to provide chain data.


HANDLER CONTRACT
----------------
Handlers MUST:

- Send at least one response (success or error) via ResponseStream.
- Not raise exceptions (errors should be sent as error responses).
- Be idempotent (same request may arrive multiple times).


CONCURRENCY
-----------
Handlers may be called concurrently for different requests.
Implementations should be thread-safe if accessing shared state.
"""

@abstractmethod
async def handle_status(self, request: Status, response: ResponseStream) -> None:
"""
Handle incoming Status request.

The handler should respond with our current chain status.

Args:
request: Peer's status message.
response: Stream for sending our status response.
"""
...

@abstractmethod
async def handle_blocks_by_root(
self,
request: BlocksByRootRequest,
response: ResponseStream,
) -> None:
"""
Handle incoming BlocksByRoot request.

The handler should send each requested block as a separate response chunk.
Blocks we do not have should be skipped (or RESOURCE_UNAVAILABLE sent).

Args:
request: List of block roots being requested.
response: Stream for sending block responses.
"""
...


BlockLookup = Callable[[Bytes32], Awaitable[SignedBlockWithAttestation | None]]
"""Type alias for block lookup function.

Expand All @@ -238,12 +122,11 @@ async def handle_blocks_by_root(


@dataclass(slots=True)
class DefaultRequestHandler(RequestHandler):
class RequestHandler:
"""
Default request handler implementation.
Request handler for inbound peer requests.

Uses callbacks to retrieve chain data.
Suitable for use with NetworkEventSource.


STATUS HANDLING
Expand All @@ -265,7 +148,7 @@ class DefaultRequestHandler(RequestHandler):
block_lookup: BlockLookup | None = None
"""Callback to look up blocks by root."""

async def handle_status(self, request: Status, response: ResponseStream) -> None:
async def handle_status(self, request: Status, response: StreamResponseAdapter) -> None:
"""
Handle incoming Status request.

Expand Down Expand Up @@ -299,7 +182,7 @@ async def handle_status(self, request: Status, response: ResponseStream) -> None
async def handle_blocks_by_root(
self,
request: BlocksByRootRequest,
response: ResponseStream,
response: StreamResponseAdapter,
) -> None:
"""
Handle incoming BlocksByRoot request.
Expand Down Expand Up @@ -373,7 +256,7 @@ class ReqRespServer:
2. Decode the request (remove length prefix, decompress Snappy).
3. Deserialize SSZ bytes to the appropriate type.
4. Dispatch to handler.
5. Handler sends response(s) via ResponseStream.
5. Handler sends response(s) via StreamResponseAdapter.
6. Close stream.


Expand Down Expand Up @@ -517,7 +400,7 @@ async def _dispatch(
self,
protocol_id: str,
ssz_bytes: bytes,
response: ResponseStream,
response: StreamResponseAdapter,
) -> None:
"""
Dispatch a request to the appropriate handler.
Expand Down
3 changes: 0 additions & 3 deletions src/lean_spec/subspecs/networking/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
GossipAttestationEvent,
GossipBlockEvent,
NetworkEvent,
NetworkEventSource,
PeerConnectedEvent,
PeerDisconnectedEvent,
PeerStatusEvent,
Expand All @@ -18,8 +17,6 @@
__all__ = [
# Service
"NetworkService",
# Protocol
"NetworkEventSource",
# Events
"GossipAttestationEvent",
"GossipBlockEvent",
Expand Down
Loading
Loading