Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/lean_spec/subspecs/networking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,6 @@

Per Ethereum spec, message IDs are the first 20 bytes of SHA256(domain + topic_len + topic + data).
"""

MAX_ERROR_MESSAGE_SIZE: Final[int] = 256
"""Maximum error message size in bytes per Ethereum P2P spec (ErrorMessage: List[byte, 256])."""
13 changes: 2 additions & 11 deletions src/lean_spec/subspecs/networking/reqresp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
"""ReqResp specs for the Lean Ethereum consensus specification."""

from .codec import (
CONTEXT_BYTES_LENGTH,
CodecError,
ForkDigestMismatchError,
ResponseCode,
decode_request,
encode_request,
prepend_context_bytes,
validate_context_bytes,
)
from .handler import (
REQRESP_PROTOCOL_IDS,
Expand All @@ -17,7 +13,7 @@
ReqRespServer,
RequestHandler,
ResponseStream,
YamuxResponseStream,
StreamResponseAdapter,
)
from .message import (
BLOCKS_BY_ROOT_PROTOCOL_V1,
Expand All @@ -38,19 +34,14 @@
"Status",
# Codec
"CodecError",
"ForkDigestMismatchError",
"ResponseCode",
"encode_request",
"decode_request",
# Context bytes
"CONTEXT_BYTES_LENGTH",
"prepend_context_bytes",
"validate_context_bytes",
# Inbound handlers
"BlockLookup",
"DefaultRequestHandler",
"RequestHandler",
"ReqRespServer",
"ResponseStream",
"YamuxResponseStream",
"StreamResponseAdapter",
]
72 changes: 0 additions & 72 deletions src/lean_spec/subspecs/networking/reqresp/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,78 +92,6 @@ class CodecError(Exception):
"""


class ForkDigestMismatchError(CodecError):
"""Raised when context bytes (fork_digest) do not match expected value.

Context bytes are 4 bytes prepended to each response chunk in
protocols that return fork-specific data (BlocksByRange, BlobSidecars).
"""

def __init__(self, expected: bytes, actual: bytes) -> None:
"""Initialize with expected and actual fork digests."""
self.expected = expected
self.actual = actual
super().__init__(f"Fork digest mismatch: expected {expected.hex()}, got {actual.hex()}")


CONTEXT_BYTES_LENGTH: int = 4
"""Length of context bytes (fork_digest) in responses."""


def validate_context_bytes(data: bytes, expected_fork_digest: bytes) -> bytes:
"""
Validate and strip context bytes from a response chunk.

Some req/resp protocols prepend a 4-byte fork_digest (context bytes)
to each response chunk. This allows clients to verify they're receiving
data for the expected fork.

Args:
data: Response data with context bytes prepended.
expected_fork_digest: Expected 4-byte fork_digest.

Returns:
Response data with context bytes stripped.

Raises:
CodecError: If data is too short to contain context bytes.
ForkDigestMismatchError: If context bytes don't match expected.
"""
if len(data) < CONTEXT_BYTES_LENGTH:
raise CodecError(
f"Response too short for context bytes: {len(data)} < {CONTEXT_BYTES_LENGTH}"
)

context_bytes = data[:CONTEXT_BYTES_LENGTH]
if context_bytes != expected_fork_digest:
raise ForkDigestMismatchError(expected_fork_digest, context_bytes)

return data[CONTEXT_BYTES_LENGTH:]


def prepend_context_bytes(data: bytes, fork_digest: bytes) -> bytes:
"""
Prepend context bytes (fork_digest) to a response chunk.

Used when sending responses for protocols that require context bytes.

Args:
data: Response data to send.
fork_digest: 4-byte fork_digest to prepend.

Returns:
Response data with context bytes prepended.

Raises:
ValueError: If fork_digest is not exactly 4 bytes.
"""
if len(fork_digest) != CONTEXT_BYTES_LENGTH:
raise ValueError(
f"Fork digest must be {CONTEXT_BYTES_LENGTH} bytes, got {len(fork_digest)}"
)
return fork_digest + data


def encode_request(ssz_data: bytes) -> bytes:
"""
Encode an SSZ-serialized request for transmission.
Expand Down
86 changes: 25 additions & 61 deletions src/lean_spec/subspecs/networking/reqresp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
This design provides three benefits:

1. Testability: Unit tests provide mock streams without network I/O.
2. Flexibility: Different transports (yamux, memory, etc.) work with the same handlers.
2. Flexibility: Different transports work with the same handlers.
3. Clarity: Handlers focus on protocol logic, not wire format encoding.

The ResponseStream translates high-level operations (send success, send error) into the
Expand Down Expand Up @@ -75,13 +75,14 @@
import logging
from abc import ABC, abstractmethod
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Protocol

from lean_spec.snappy import frame_decompress
from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.networking.config import MAX_ERROR_MESSAGE_SIZE
from lean_spec.subspecs.networking.transport.connection.types import Stream
from lean_spec.subspecs.networking.varint import decode_varint
from lean_spec.subspecs.networking.varint import VarintError, decode_varint
from lean_spec.types import Bytes32

from .codec import ResponseCode
Expand All @@ -94,9 +95,6 @@

logger = logging.getLogger(__name__)

REQUEST_TIMEOUT_SECONDS: float = 10.0
"""Default timeout for processing inbound requests."""


class ResponseStream(Protocol):
"""
Expand Down Expand Up @@ -143,68 +141,34 @@ async def finish(self) -> None:


@dataclass(slots=True)
class YamuxResponseStream:
"""
ResponseStream implementation wrapping a yamux stream.
class StreamResponseAdapter:
"""Adapts a transport Stream to the ResponseStream protocol.

Encodes responses using the wire format from codec.py and writes
them to the underlying stream.
"""

_stream: Stream
"""Underlying yamux stream."""
"""Underlying transport stream."""

async def send_success(self, ssz_data: bytes) -> None:
"""
Send a SUCCESS response chunk.
"""Send a SUCCESS response chunk.

Args:
ssz_data: SSZ-encoded response payload.
"""
# Encode the response using the protocol wire format.
#
# ResponseCode.SUCCESS (0x00) tells the peer this chunk contains valid data.
# The encode method handles:
#
# 1. Prepending the response code byte
# 2. Adding the varint length prefix
# 3. Compressing with Snappy framing
encoded = ResponseCode.SUCCESS.encode(ssz_data)

# Write using sync write + async drain for compatibility with both
# raw QUIC streams (async write) and wrapper streams (sync write + drain).
write_result = self._stream.write(encoded)
if hasattr(write_result, "__await__"):
await write_result
drain = getattr(self._stream, "drain", None)
if drain is not None:
await drain()
await self._stream.write(encoded)

async def send_error(self, code: ResponseCode, message: str) -> None:
"""
Send an error response.
"""Send an error response.

Args:
code: Error code.
message: Human-readable error description.
"""
# Error messages must be UTF-8 encoded per the Ethereum P2P spec.
#
# The spec mandates UTF-8 for interoperability across clients.
# Common error codes:
#
# - INVALID_REQUEST (1): Malformed request, bad SSZ, protocol violation
# - SERVER_ERROR (2): Internal failure, handler exception
# - RESOURCE_UNAVAILABLE (3): Block/blob not found
encoded = code.encode(message.encode("utf-8"))

# Write using sync write + async drain for compatibility.
write_result = self._stream.write(encoded)
if hasattr(write_result, "__await__"):
await write_result
drain = getattr(self._stream, "drain", None)
if drain is not None:
await drain()
encoded = code.encode(message.encode("utf-8")[:MAX_ERROR_MESSAGE_SIZE])
await self._stream.write(encoded)

async def finish(self) -> None:
"""Close the stream gracefully."""
Expand Down Expand Up @@ -425,24 +389,17 @@ class ReqRespServer:
handler: RequestHandler
"""Handler for processing requests."""

_pending_data: dict[int, bytearray] = field(default_factory=dict)
"""Buffer for accumulating request data by stream ID.

Request data may arrive in multiple chunks. We accumulate until
the stream closes, then process the complete request.
"""

async def handle_stream(self, stream: Stream, protocol_id: str) -> None:
"""
Handle an incoming ReqResp stream.

Reads the request, decodes it, and dispatches to the appropriate handler.

Args:
stream: Incoming yamux stream.
stream: Incoming transport stream.
protocol_id: Negotiated protocol ID.
"""
response = YamuxResponseStream(_stream=stream)
response = StreamResponseAdapter(_stream=stream)

try:
# Step 1: Read and decode the request.
Expand Down Expand Up @@ -523,20 +480,27 @@ async def _read_request(self, stream: Stream) -> bytes:
# Try to decode the varint
try:
declared_length, varint_size = decode_varint(bytes(buffer))
except Exception:
except VarintError:
# Need more data for varint
continue

# Guard against unbounded compressed data.
# Snappy's worst-case expansion is ~(n + n/6 + 1024).
max_compressed = declared_length + declared_length // 6 + 1024

# Now read until we can successfully decompress
compressed_data = buffer[varint_size:]

while True:
if len(compressed_data) > max_compressed:
return b""

try:
decompressed = frame_decompress(bytes(compressed_data))
if len(decompressed) == declared_length:
return decompressed
# Length mismatch - need more data
except Exception:
except SnappyDecompressionError:
# Decompression failed - need more data
pass

Expand All @@ -545,7 +509,7 @@ async def _read_request(self, stream: Stream) -> bytes:
# Stream closed, try one more decompress
try:
return frame_decompress(bytes(compressed_data))
except Exception:
except SnappyDecompressionError:
return bytes(buffer)
compressed_data.extend(chunk)

Expand Down
4 changes: 0 additions & 4 deletions src/lean_spec/subspecs/networking/reqresp/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
from ..config import MAX_REQUEST_BLOCKS
from ..types import ProtocolId

# --- Status v1 ---

STATUS_PROTOCOL_V1: ProtocolId = "/leanconsensus/req/status/1/ssz_snappy"
"""The protocol ID for the Status v1 request/response message."""

Expand All @@ -42,8 +40,6 @@ class Status(Container):
"""The client's current head checkpoint."""


# --- BlocksByRoot v1 ---

BLOCKS_BY_ROOT_PROTOCOL_V1: ProtocolId = "/leanconsensus/req/blocks_by_root/1/ssz_snappy"
"""The protocol ID for the BlocksByRoot v1 request/response message."""

Expand Down
Loading
Loading