-
Notifications
You must be signed in to change notification settings - Fork 10
feat: implement retryable state and preserve message context on failure #161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughAdds a Retryable processed-message state, a storage API to mark failed messages retryable, and updates core message-processing failure and recovery paths to preserve and reuse message context (message_event_id, mls_group_id, epoch) across failures and retries. Changes
Sequence Diagram(s)mermaid mermaid Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Coverage: 88.74% → 89.99% (+1.25%) |
fd4fc49 to
067ca95
Compare
✅ Coverage: 88.74% → 90.68% (+1.94%) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/mdk-core/src/messages.rs (1)
1787-1804: Preserveepoch/mls_group_idfrom the existing record when transitioning to Failed.
Right now onlymessage_event_idis preserved;epochandmls_group_idare overwritten with the current group values. For wrong-epoch/mismatch paths this records the wrong context and defeats the preservation goal. Consider reusing existing values with fallback to the current group.💡 Suggested pattern (apply to the failure branches above)
- let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id); + let message_event_id = existing_record.as_ref().and_then(|r| r.message_event_id); + let mls_group_id = existing_record + .as_ref() + .and_then(|r| r.mls_group_id.clone()) + .or_else(|| Some(group.mls_group_id.clone())); + let epoch = existing_record + .as_ref() + .and_then(|r| r.epoch) + .or_else(|| Some(group.epoch)); ... - epoch: Some(group.epoch), - mls_group_id: Some(group.mls_group_id.clone()), + epoch, + mls_group_id,Also applies to: 1839-1856, 1868-1885, 1897-1914, 1939-1956
🤖 Fix all issues with AI agents
In `@crates/mdk-core/src/messages.rs`:
- Around line 1615-1656: When marking a Retryable processed_message as
MessageState::Processed (in the retry branch around
processed_message.message_event_id handling), also clear/overwrite its prior
failure metadata: set processed_message.failure_reason to None (or empty) and
update processed_message.processed_at to the current timestamp before calling
self.storage().save_processed_message(processed_message.clone()). Likewise
ensure the corresponding stored message state is updated to
MessageState::Processed (already done via message.state) and persisted with
save_message; use the same timestamp source you use elsewhere (e.g., Utc::now())
so downstream consumers do not see stale failure_reason/processed_at when
ProcessedMessageState::Processed is stored.
🧹 Nitpick comments (1)
crates/mdk-memory-storage/src/messages.rs (1)
751-847: Moveusestatements to module scope (guideline).
In-function imports at Lines 753, 800, and 814 violate the “use statements at top of scope” rule. Consider hoisting them to the test module level.As per coding guidelines, keep `use` statements at the top of their scope.♻️ Proposed refactor
#[cfg(test)] mod tests { use std::collections::BTreeSet; use mdk_storage_traits::groups::GroupStorage; use mdk_storage_traits::groups::types::{Group, GroupState}; + use mdk_storage_traits::messages::error::MessageError; + use mdk_storage_traits::messages::types::ProcessedMessage; use nostr::Keys; use super::*; @@ #[test] fn test_mark_processed_message_retryable() { - use mdk_storage_traits::messages::types::ProcessedMessage; - let storage = MdkMemoryStorage::new(); @@ #[test] fn test_mark_nonexistent_message_retryable_fails() { - use mdk_storage_traits::messages::error::MessageError; - let storage = MdkMemoryStorage::new(); @@ #[test] fn test_mark_non_failed_message_retryable_fails() { - use mdk_storage_traits::messages::error::MessageError; - use mdk_storage_traits::messages::types::ProcessedMessage; - let storage = MdkMemoryStorage::new();
067ca95 to
d5d1b8d
Compare
✅ Coverage: 88.74% → 90.63% (+1.89%) |
- Preserve message_event_id, epoch, and mls_group_id when transitioning to Failed state - Allow reprocessing of messages in Retryable state - Update message processing logic to handle retry scenarios - Fix message state persistence in storage implementations
d5d1b8d to
b0c6793
Compare
✅ Coverage: 88.74% → 90.63% (+1.89%) |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@crates/mdk-core/src/messages.rs`:
- Around line 1615-1658: The current pattern "if let Some(message_event_id) =
processed_message.message_event_id && let Ok(Some(mut message)) =
self.get_message(...)" hides storage errors by treating Err as a missing
message; change this to explicitly match the result of
self.get_message(&group.mls_group_id, &message_event_id) (or use the ? operator)
so that Err(e) is propagated as a storage error (convert to Error::Message or
propagate the underlying Error) while preserving the Ok(Some(_)) branch for
using cached content and treating Ok(None) as the unprocessable case; update
references in this block (ProcessedMessageState::Retryable, get_message,
storage().save_message, save_processed_message, and
MessageProcessingResult::Unprocessable) accordingly.
In `@crates/mdk-storage-traits/CHANGELOG.md`:
- Around line 28-30: Move the "Retryable Message State" changelog entry into the
"Breaking changes" section and update the text to explicitly state that adding
the `Retryable` variant to `ProcessedMessageState` (without `#[non_exhaustive]`)
and adding the `mark_processed_message_retryable` method to the `MessageStorage`
trait are breaking changes; reference both `ProcessedMessageState::Retryable`
and the `MessageStorage::mark_processed_message_retryable` method so downstream
users know to update exhaustive matches and trait implementations.
…ng changes - Changed get_message error handling in Retryable branch from if-let pattern that silently swallowed Err to explicit match with proper error propagation - Moved Retryable Message State changelog entry to Breaking changes section - Updated changelog to explicitly reference ProcessedMessageState::Retryable and MessageStorage::mark_processed_message_retryable() as breaking changes Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
✅ Coverage: 88.74% → 90.6% (+1.86%) |
✅ Coverage: 88.74% → 90.71% (+1.97%) |
jgmontoya
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Leaving some comments 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is huge. Even if most of it is tests it's still a couple thousand lines of non-test code. Is there a way we could refactor it so that its easier to work with?
| // Retryable messages are ones that previously failed due to wrong epoch keys | ||
| // but have been marked for retry after a rollback. For our own messages, | ||
| // we should have cached content - try to retrieve and return it. | ||
| tracing::debug!(target: "mdk_core::messages::process_message", "Retrying own message after rollback"); | ||
|
|
||
| if let Some(message_event_id) = processed_message.message_event_id { | ||
| match self | ||
| .get_message(&group.mls_group_id, &message_event_id) | ||
| .map_err(|e| Error::Message(e.to_string()))? | ||
| { | ||
| Some(mut message) => { | ||
| // Update states to mark as successfully processed | ||
| message.state = message_types::MessageState::Processed; | ||
| self.storage() | ||
| .save_message(message) | ||
| .map_err(|e| Error::Message(e.to_string()))?; | ||
|
|
||
| processed_message.state = | ||
| message_types::ProcessedMessageState::Processed; | ||
| processed_message.failure_reason = None; | ||
| processed_message.processed_at = Timestamp::now(); | ||
| self.storage() | ||
| .save_processed_message(processed_message.clone()) | ||
| .map_err(|e| Error::Message(e.to_string()))?; | ||
|
|
||
| tracing::info!( | ||
| target: "mdk_core::messages::process_message", | ||
| "Successfully retried own cached message after rollback" | ||
| ); | ||
| let message = self | ||
| .get_message(&group.mls_group_id, &message_event_id) | ||
| .map_err(|e| Error::Message(e.to_string()))? | ||
| .ok_or(Error::MessageNotFound)?; | ||
| return Ok(MessageProcessingResult::ApplicationMessage( | ||
| message, | ||
| )); | ||
| } | ||
| None => { | ||
| // No cached content available - fall through to Unprocessable | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // No cached content available - this shouldn't happen for our own messages, | ||
| // but if it does, we can't recover | ||
| tracing::warn!( | ||
| target: "mdk_core::messages::process_message", | ||
| "Retryable own message has no cached content - cannot recover" | ||
| ); | ||
| Ok(MessageProcessingResult::Unprocessable { | ||
| mls_group_id: group.mls_group_id.clone(), | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this perhaps be extracted? This whole method is now over 300 lines ☠️ It looks similar to the message_types::ProcessedMessageState::Created branch 👀
| let message = self | ||
| .get_message(&group.mls_group_id, &message_event_id) | ||
| .map_err(|e| Error::Message(e.to_string()))? | ||
| .ok_or(Error::MessageNotFound)?; | ||
| return Ok(MessageProcessingResult::ApplicationMessage( | ||
| message, | ||
| )); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to refetch it here? Can't we return the message we already had in scope?
| /// Helper to save a failed message while preserving context from existing records | ||
| /// or using explicit context if provided. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just above these lines there was this piece of comment:
/// Saves a failed processed message record to prevent reprocessing
///
This used to "belong" to the save_failed_processed_message method. Should we move it back there?
| /// Message not found or not in expected state | ||
| NotFound, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maybe have two distinct errors (NotFound / InvalidState) instead of using NotFound for both?
| } | ||
|
|
||
| #[test] | ||
| fn test_save_failed_processed_message_preserves_message_event_id() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test! How about also asserting that failure_reason is set to the expected sanitized value ("own_message" per classify_failure_reason). This would catch regressions if someone changes the sanitization logic.
Summary
This PR implements better handling for retryable message states and ensures message context (like
message_event_id) is preserved when messages fail processing.Fixes #160
Changes
process_messageinmdk-coreto look up existing records and preservemessage_event_id,epoch, andmls_group_idwhen creating aFailedstate record.Retryable.Reasoning
Previously, failing messages might lose context about their origin, making it difficult to retry them correctly or correlate them with the original send event. This change ensures that even if processing fails, we keep the link to the original message event.
This PR adds a Retryable processed-message state and updates message processing to preserve original message context (message_event_id, epoch, mls_group_id) when failures occur, enabling safe retries and better correlation with the original send event. It also adds storage support to mark failed messages as retryable and updates tests and changelogs to reflect the new behavior.
What changed:
Security impact:
Protocol changes:
API surface:
Testing: