Skip to content

Backpressure and concurrency control middleware for FastMCP MCP servers

License

Notifications You must be signed in to change notification settings

nulone/mcp-backpressure

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

mcp-backpressure

Backpressure and concurrency control middleware for FastMCP MCP servers.

Problem: LLMs can generate hundreds of parallel tool calls, causing resource exhaustion, server crashes, and no structured feedback for clients to retry.

Solution: Middleware that limits concurrent executions, queues excess requests with timeout, and returns structured JSON-RPC overload errors.

Quickstart

from fastmcp import FastMCP
from mcp_backpressure import BackpressureMiddleware

mcp = FastMCP("MyServer")
mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,      # Max parallel executions
    queue_size=10,         # Bounded queue for waiting requests
    queue_timeout=30.0,    # Queue wait timeout (seconds)
))

Installation

pip install mcp-backpressure

Features

  • Concurrency limiting: Semaphore-based control of parallel executions
  • Bounded queue: Optional FIFO queue with configurable size
  • Queue timeout: Automatic timeout for queued requests with cleanup
  • Structured errors: JSON-RPC compliant overload errors with detailed metrics
  • Metrics: Real-time counters for active, queued, and rejected requests
  • Callback hook: Optional notification on each overload event
  • Zero dependencies: Only requires FastMCP and Python 3.10+

Usage

Basic Configuration

from mcp_backpressure import BackpressureMiddleware

mcp.add_middleware(BackpressureMiddleware(
    max_concurrent=5,           # Required: max parallel tool executions
    queue_size=10,              # Optional: bounded queue (0 = no queue)
    queue_timeout=30.0,         # Optional: seconds to wait in queue
    overload_error_code=-32001, # Optional: JSON-RPC error code
    on_overload=callback,       # Optional: called on each overload
))

Parameters

Parameter Type Default Description
max_concurrent int required Maximum number of concurrent tool executions. Must be >= 1.
queue_size int 0 Maximum queue size for waiting requests. Set to 0 to reject immediately when limit reached.
queue_timeout float 30.0 Maximum time (seconds) a request can wait in queue before timing out. Must be > 0.
overload_error_code int -32001 JSON-RPC error code returned when server is overloaded.
on_overload Callable None Optional callback (error: OverloadError) -> None invoked on each overload.

Error Handling

When the server is overloaded, requests are rejected with a structured JSON-RPC error:

{
  "code": -32001,
  "message": "SERVER_OVERLOADED",
  "data": {
    "reason": "queue_full",
    "active": 5,
    "queued": 10,
    "max_concurrent": 5,
    "queue_size": 10,
    "queue_timeout_ms": 30000,
    "retry_after_ms": 1000
  }
}

Overload Reasons

Reason Description
concurrency_limit All execution slots full and no queue configured (queue_size=0)
queue_full All execution slots and queue slots are full
queue_timeout Request waited in queue longer than queue_timeout

Metrics

Get real-time metrics from the middleware:

metrics = middleware.get_metrics()  # Synchronous

print(f"Active: {metrics.active}")
print(f"Queued: {metrics.queued}")
print(f"Total rejected: {metrics.total_rejected}")
print(f"Rejected (concurrency): {metrics.rejected_concurrency_limit}")
print(f"Rejected (queue full): {metrics.rejected_queue_full}")
print(f"Rejected (timeout): {metrics.rejected_queue_timeout}")

For async contexts, use await middleware.get_metrics_async().

Callback Hook

Register a callback to be notified of each overload event:

def on_overload(error: OverloadError):
    print(f"OVERLOAD: {error.reason} (active={error.active})")
    # Log to monitoring system, update metrics, etc.

middleware = BackpressureMiddleware(
    max_concurrent=5,
    queue_size=10,
    on_overload=on_overload,
)

Examples

Simple Server

See examples/simple_server.py for a minimal FastMCP server with backpressure.

Load Simulation

Run examples/load_simulation.py to see backpressure behavior under heavy concurrent load:

python examples/load_simulation.py

This simulates 30 concurrent requests against a server limited to 5 concurrent executions with a queue of 10, demonstrating how the middleware handles overload.

How It Works

The middleware provides two-level limiting:

  1. Semaphore (max_concurrent): Controls active executions
  2. Bounded queue (queue_size): Holds waiting requests with timeout

Request flow:

  • If execution slot available → execute immediately
  • If execution slots full and queue not full → wait in queue with timeout
  • If queue full → reject with queue_full
  • If timeout in queue → reject with queue_timeout

Invariants (guaranteed under all conditions):

  • active <= max_concurrent ALWAYS
  • queued <= queue_size ALWAYS
  • Cancellation correctly frees slots and decrements counters
  • Queue timeout removes item from queue

Development

Running Tests

python -m pytest tests/ -v

Linting

ruff check src/ tests/

Design Rationale

This library emerged from python-sdk #1698 (closed as "not planned"). Key design decisions:

  • Global limits only (v0.1): Per-client and per-tool limits deferred to v0.2+
  • Simple counters: No Prometheus/OTEL dependencies by default
  • JSON-RPC errors: Follows MCP protocol conventions
  • Monotonic time: Queue timeouts use time.monotonic() for reliability

License

MIT

Contributing

Contributions welcome! Please open an issue before submitting PRs.

Changelog

See CHANGELOG.md

About

Backpressure and concurrency control middleware for FastMCP MCP servers

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages