-
Notifications
You must be signed in to change notification settings - Fork 586
Streamable HTTP resumability + redelivery + SSE polling via server-side disconnect #1077
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
MackinnonBuck
wants to merge
2
commits into
main
Choose a base branch
from
mbuck/resumability-redelivery
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,8 @@ internal sealed partial class StreamableHttpClientSessionTransport : TransportBa | |
| private static readonly MediaTypeWithQualityHeaderValue s_applicationJsonMediaType = new("application/json"); | ||
| private static readonly MediaTypeWithQualityHeaderValue s_textEventStreamMediaType = new("text/event-stream"); | ||
|
|
||
| private static readonly TimeSpan s_defaultReconnectionDelay = TimeSpan.FromSeconds(1); | ||
|
|
||
| private readonly McpHttpClient _httpClient; | ||
| private readonly HttpClientTransportOptions _options; | ||
| private readonly CancellationTokenSource _connectionCts; | ||
|
|
@@ -106,7 +108,17 @@ internal async Task<HttpResponseMessage> SendHttpRequestAsync(JsonRpcMessage mes | |
| else if (response.Content.Headers.ContentType?.MediaType == "text/event-stream") | ||
| { | ||
| using var responseBodyStream = await response.Content.ReadAsStreamAsync(cancellationToken); | ||
| rpcResponseOrError = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false); | ||
| var sseState = await ProcessSseResponseAsync(responseBodyStream, rpcRequest, cancellationToken).ConfigureAwait(false); | ||
| rpcResponseOrError = sseState.Response; | ||
|
|
||
| // Resumability: If POST SSE stream ended without a response but we have a Last-Event-ID (from priming), | ||
| // attempt to resume by sending a GET request with Last-Event-ID header. The server will replay | ||
| // events from the event store, allowing us to receive the pending response. | ||
| if (rpcResponseOrError is null && rpcRequest is not null && sseState.LastEventId is not null) | ||
| { | ||
| var resumeResult = await SendGetSseRequestWithRetriesAsync(rpcRequest, sseState, cancellationToken).ConfigureAwait(false); | ||
| rpcResponseOrError = resumeResult.Response; | ||
| } | ||
| } | ||
|
|
||
| if (rpcRequest is null) | ||
|
|
@@ -188,54 +200,135 @@ public override async ValueTask DisposeAsync() | |
|
|
||
| private async Task ReceiveUnsolicitedMessagesAsync() | ||
| { | ||
| // Send a GET request to handle any unsolicited messages not sent over a POST response. | ||
| using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint); | ||
| request.Headers.Accept.Add(s_textEventStreamMediaType); | ||
| CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion); | ||
| var state = new SseStreamState(); | ||
|
|
||
| // Server support for the GET request is optional. If it fails, we don't care. It just means we won't receive unsolicited messages. | ||
| HttpResponseMessage response; | ||
| try | ||
| { | ||
| response = await _httpClient.SendAsync(request, message: null, _connectionCts.Token).ConfigureAwait(false); | ||
| } | ||
| catch (HttpRequestException) | ||
| // Continuously receive unsolicited messages until cancelled | ||
| while (!_connectionCts.Token.IsCancellationRequested) | ||
| { | ||
| return; | ||
| var result = await SendGetSseRequestWithRetriesAsync( | ||
| relatedRpcRequest: null, | ||
| state, | ||
| _connectionCts.Token).ConfigureAwait(false); | ||
|
|
||
| // Update state for next reconnection attempt | ||
| state.UpdateFrom(result); | ||
|
|
||
| // If we exhausted retries without receiving any events, stop trying | ||
| if (result.LastEventId is null) | ||
| { | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Sends a GET request for SSE with retry logic and resumability support. | ||
| /// </summary> | ||
| private async Task<SseStreamState> SendGetSseRequestWithRetriesAsync( | ||
| JsonRpcRequest? relatedRpcRequest, | ||
| SseStreamState state, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| int attempt = 0; | ||
|
|
||
| // Delay before first attempt if we're reconnecting (have a Last-Event-ID) | ||
| bool shouldDelay = state.LastEventId is not null; | ||
|
|
||
| using (response) | ||
| while (attempt < _options.MaxReconnectionAttempts) | ||
| { | ||
| if (!response.IsSuccessStatusCode) | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
|
|
||
| if (shouldDelay) | ||
| { | ||
| return; | ||
| var delay = state.RetryInterval ?? s_defaultReconnectionDelay; | ||
| await Task.Delay(delay, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| shouldDelay = true; | ||
|
|
||
| using var request = new HttpRequestMessage(HttpMethod.Get, _options.Endpoint); | ||
| request.Headers.Accept.Add(s_textEventStreamMediaType); | ||
| CopyAdditionalHeaders(request.Headers, _options.AdditionalHeaders, SessionId, _negotiatedProtocolVersion, state.LastEventId); | ||
|
|
||
| HttpResponseMessage response; | ||
| try | ||
| { | ||
| response = await _httpClient.SendAsync(request, message: null, cancellationToken).ConfigureAwait(false); | ||
| } | ||
| catch (HttpRequestException) | ||
| { | ||
| attempt++; | ||
| continue; | ||
| } | ||
|
|
||
| using (response) | ||
| { | ||
| if (!response.IsSuccessStatusCode) | ||
| { | ||
| attempt++; | ||
| continue; | ||
| } | ||
|
|
||
| using var responseStream = await response.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); | ||
| var result = await ProcessSseResponseAsync(responseStream, relatedRpcRequest, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| state.UpdateFrom(result); | ||
|
|
||
| if (result.Response is not null) | ||
| { | ||
| return state; | ||
| } | ||
|
|
||
| using var responseStream = await response.Content.ReadAsStreamAsync(_connectionCts.Token).ConfigureAwait(false); | ||
| await ProcessSseResponseAsync(responseStream, relatedRpcRequest: null, _connectionCts.Token).ConfigureAwait(false); | ||
| // Stream closed without the response | ||
| if (state.LastEventId is null) | ||
| { | ||
| // No event ID means server may not support resumability - don't retry indefinitely | ||
| attempt++; | ||
| } | ||
| else | ||
| { | ||
| // We have an event ID, so reconnection should work - reset attempts | ||
| attempt = 0; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What prevents us from ending up in an infinite loop? |
||
| } | ||
| } | ||
| } | ||
|
|
||
| return state; | ||
| } | ||
|
|
||
| private async Task<JsonRpcMessageWithId?> ProcessSseResponseAsync(Stream responseStream, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken) | ||
| private async Task<SseStreamState> ProcessSseResponseAsync( | ||
| Stream responseStream, | ||
| JsonRpcRequest? relatedRpcRequest, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| var state = new SseStreamState(); | ||
|
|
||
| await foreach (SseItem<string> sseEvent in SseParser.Create(responseStream).EnumerateAsync(cancellationToken).ConfigureAwait(false)) | ||
| { | ||
| if (sseEvent.EventType != "message") | ||
| // Track event ID and retry interval for resumability | ||
| if (!string.IsNullOrEmpty(sseEvent.EventId)) | ||
| { | ||
| state.LastEventId = sseEvent.EventId; | ||
| } | ||
| if (sseEvent.ReconnectionInterval.HasValue) | ||
| { | ||
| state.RetryInterval = sseEvent.ReconnectionInterval.Value; | ||
| } | ||
|
|
||
| // Skip events with empty data (priming events, keep-alives) | ||
| if (string.IsNullOrEmpty(sseEvent.Data) || sseEvent.EventType != "message") | ||
| { | ||
| continue; | ||
| } | ||
|
|
||
| var rpcResponseOrError = await ProcessMessageAsync(sseEvent.Data, relatedRpcRequest, cancellationToken).ConfigureAwait(false); | ||
|
|
||
| // The server SHOULD end the HTTP response body here anyway, but we won't leave it to chance. This transport makes | ||
| // a GET request for any notifications that might need to be sent after the completion of each POST. | ||
| if (rpcResponseOrError is not null) | ||
| { | ||
| return rpcResponseOrError; | ||
| state.Response = rpcResponseOrError; | ||
| return state; | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| return state; | ||
| } | ||
|
|
||
| private async Task<JsonRpcMessageWithId?> ProcessMessageAsync(string data, JsonRpcRequest? relatedRpcRequest, CancellationToken cancellationToken) | ||
|
|
@@ -292,7 +385,8 @@ internal static void CopyAdditionalHeaders( | |
| HttpRequestHeaders headers, | ||
| IDictionary<string, string>? additionalHeaders, | ||
| string? sessionId, | ||
| string? protocolVersion) | ||
| string? protocolVersion, | ||
| string? lastEventId = null) | ||
| { | ||
| if (sessionId is not null) | ||
| { | ||
|
|
@@ -304,6 +398,11 @@ internal static void CopyAdditionalHeaders( | |
| headers.Add("MCP-Protocol-Version", protocolVersion); | ||
| } | ||
|
|
||
| if (lastEventId is not null) | ||
| { | ||
| headers.Add("Last-Event-ID", lastEventId); | ||
| } | ||
|
|
||
| if (additionalHeaders is null) | ||
| { | ||
| return; | ||
|
|
@@ -317,4 +416,21 @@ internal static void CopyAdditionalHeaders( | |
| } | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Tracks state across SSE stream connections. | ||
| /// </summary> | ||
| private struct SseStreamState | ||
| { | ||
| public JsonRpcMessageWithId? Response; | ||
| public string? LastEventId; | ||
| public TimeSpan? RetryInterval; | ||
|
|
||
| public void UpdateFrom(SseStreamState other) | ||
| { | ||
| Response ??= other.Response; | ||
| LastEventId ??= other.LastEventId; | ||
| RetryInterval ??= other.RetryInterval; | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 change: 0 additions & 1 deletion
1
src/ModelContextProtocol.Core/Protocol/JsonRpcMessageContext.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: canceled