Persist Transient Local topics in snapshot mode#2279
Draft
tonynajjar wants to merge 3 commits intoros2:rollingfrom
Draft
Persist Transient Local topics in snapshot mode#2279tonynajjar wants to merge 3 commits intoros2:rollingfrom
tonynajjar wants to merge 3 commits intoros2:rollingfrom
Conversation
Signed-off-by: Tony Najjar <tony.najjar@dexory.com>
tonynajjar
commented
Dec 22, 2025
| message_lost = !message_cache_->push(converted_msg); | ||
| // Check if this topic uses transient_local durability | ||
| if (transient_local_topics_.find(message->topic_name) != transient_local_topics_.end()) { | ||
| message_cache_->push_transient_local(converted_msg); |
Author
There was a problem hiding this comment.
TODO: handling of lost messages?
tonynajjar
commented
Dec 22, 2025
| std::lock_guard<std::mutex> cache_lock(transient_local_buffer_mutex_); | ||
| // Store/update the latest message for this topic | ||
| // This ensures we always have the most recent state for transient local topics | ||
| transient_local_messages_[msg->topic_name] = std::move(msg); |
Author
There was a problem hiding this comment.
TODO: store more than just the last msg?
tonynajjar
commented
Dec 22, 2025
Comment on lines
120
to
126
| // Use the front message timestamp as the snapshot start time for transient local messages. | ||
| // This may not be the absolute earliest timestamp in the buffer (messages can arrive | ||
| // out of order), but it's close enough - typically within 1-2ms of the true minimum. | ||
| // This avoids the overhead of searching through all messages for the exact minimum. | ||
| if (!consumer_data.empty()) { | ||
| snapshot_start_time = consumer_data.front()->recv_timestamp; | ||
| } |
tonynajjar
commented
Dec 22, 2025
Comment on lines
+134
to
+135
| updated_msg->recv_timestamp = snapshot_start_time; | ||
| updated_msg->send_timestamp = snapshot_start_time; |
Author
There was a problem hiding this comment.
TODO: not sure if they should have the same timestamp
c539c97 to
21ab3a8
Compare
tonynajjar
commented
Jan 23, 2026
| return true; | ||
| } | ||
|
|
||
| bool MessageCacheCircularBuffer::push_front(CacheBufferInterface::buffer_element_t msg) |
Author
There was a problem hiding this comment.
This will need to change to respect the max buffer size as well
Author
|
Hi @MichaelOrlov, did you have the chance to take a look? |
Contributor
|
@tonynajjar Sorry, not yet. I decided that we need to land the first feature for the time-based snapshot. I just merged it yesterday. Then we can figure out how other staff related to the snapshot will align. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Summary
This PR implements preservation of topics with
TRANSIENT_LOCALQoS durability in snapshot mode recording. Previously, transient local topics (like/tf_static, parameter events, or latched topics) could be lost when the circular buffer rolled over, even though they contain critical state information. This feature ensures these messages are always included in snapshots.Problem
In snapshot mode, rosbag2 uses a circular buffer to maintain a rolling window of messages. When a snapshot is triggered, the buffer contents are dumped to disk. However, topics with
TRANSIENT_LOCALdurability semantics—which should persist their latest message for late-joining subscribers—were not treated specially and could be overwritten before snapshot capture.Solution
The implementation adds a separate storage mechanism for transient local messages:
1. Cache Layer (
CircularMessageCache)transient_local_messages_map to store latest message per transient local topicpush_transient_local()method for separate message routingswap_buffers()to merge transient local messages into snapshots with adjusted timestamps2. Writer Layer (
SequentialWriter,Writer)mark_topic_as_transient_local()API to register topicswrite()to route transient local messages to protected storagetransient_local_topics_set to track registered topics3. Recorder Integration
TransientLocalQoS durability during subscription4. Interface Updates
push_transient_local()toMessageCacheInterfacewith default fallbackmark_topic_as_transient_local()toBaseWriterInterfaceFixes #1886
Is this user-facing behavior change?
No
Did you use Generative AI?
Clause Sonnet 4.5
Additional Information