rosbag2_cpp: Add support for reindexing compressed bags#2326
rosbag2_cpp: Add support for reindexing compressed bags#2326BhuvanB404 wants to merge 1 commit intoros2:rollingfrom
Conversation
Fixes ros2#990 The reindexer used to fail for compressed files(.zstd) This fixes it by decompressing the files to /tmp before reindexing Signed-off-by: BhuvanB <bhuvanb1408@gmail.com>
f520e7d to
6bc3677
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds support for reindexing compressed ROS bag files (.zstd) to address issue #990 where the reindex operation would fail when encountering compressed bag files. The implementation decompresses files before reading their metadata, then reconstructs the metadata.yaml file with proper compression information.
Changes:
- Extended the Reindexer class to handle compressed bag files by integrating with the compression system
- Added compression factory dependency to rosbag2_cpp and rosbag2_py packages
- Updated regex pattern to detect compressed file extensions
- Added logic to decompress files and populate compression metadata fields
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| rosbag2_cpp/src/rosbag2_cpp/reindexer.cpp | Core implementation: added decompression logic, updated regex pattern for compressed files, and added compression metadata handling |
| rosbag2_cpp/include/rosbag2_cpp/reindexer.hpp | Added CompressionFactory member, updated constructor signature, changed destructor declaration |
| rosbag2_cpp/CMakeLists.txt | Added rosbag2_compression dependency |
| rosbag2_py/src/rosbag2_py/_reindexer.cpp | Added compression_factory.hpp include |
| rosbag2_py/CMakeLists.txt | Added rosbag2_compression dependency |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | ||
| if (!compression_factory_) { | ||
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | ||
| } | ||
|
|
||
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); | ||
|
|
||
| if (metadata_.compression_format.empty()) { | ||
| metadata_.compression_format = "zstd"; | ||
| metadata_.compression_mode = "FILE"; |
There was a problem hiding this comment.
The decompression logic hardcodes support for only the ".zstd" extension. This approach has several issues:
-
It only supports zstd compression, but the compression system is pluggable and could support other formats. The compression format should be detected dynamically from the file extension or from discovered metadata.
-
The hardcoded string ".zstd" check is fragile. Consider using the decompressor's
get_decompression_identifier()method or checking against available compression plugins. -
The code doesn't handle the case where the compression factory fails to create a decompressor (if
create_decompressorreturns nullptr), which could cause a null pointer dereference.
Consider refactoring to:
- Extract the file extension and use it to dynamically determine the compression format
- Add error handling for when decompressor creation fails
- Support all available compression formats, not just zstd
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | |
| if (!compression_factory_) { | |
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | |
| } | |
| // Decompress to /tmp | |
| auto decompressor = compression_factory_->create_decompressor("zstd"); | |
| uri_to_read = decompressor->decompress_uri(file_string); | |
| if (metadata_.compression_format.empty()) { | |
| metadata_.compression_format = "zstd"; | |
| metadata_.compression_mode = "FILE"; | |
| // Detect compression format from file extension (e.g. ".zstd", ".lz4") | |
| const std::string extension = fs::path(file_string).extension().string(); | |
| if (!extension.empty()) { | |
| // Strip leading '.' to get the compression format identifier | |
| const std::string compression_format = extension.substr(1); | |
| if (!compression_factory_) { | |
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | |
| } | |
| // Attempt to create a decompressor for the detected format | |
| auto decompressor = compression_factory_->create_decompressor(compression_format); | |
| if (!decompressor) { | |
| ROSBAG2_CPP_LOG_ERROR_STREAM( | |
| "Failed to create decompressor for format '" << compression_format << | |
| "' when processing file '" << file_string << "'. Proceeding without decompression."); | |
| } else { | |
| // Decompress to a temporary location | |
| uri_to_read = decompressor->decompress_uri(file_string); | |
| if (metadata_.compression_format.empty()) { | |
| metadata_.compression_format = compression_format; | |
| metadata_.compression_mode = "FILE"; | |
| } |
| @@ -226,6 +251,13 @@ void Reindexer::aggregate_metadata( | |||
| } | |||
| } | |||
|
|
|||
| rosbag2_storage::FileInformation file_info; | |||
| file_info.path = original_filename; | |||
| file_info.starting_time = temp_metadata.starting_time; | |||
| file_info.duration = temp_metadata.duration; | |||
| file_info.message_count = temp_metadata.message_count; | |||
| metadata_.files.push_back(file_info); | |||
|
|
|||
| bag_reader->close(); | |||
There was a problem hiding this comment.
The decompressed temporary files created by decompress_uri are never cleaned up. The zstd decompressor creates files in the same directory as the source file (by calling replace_extension() on the input path), which means temporary decompressed files will be left in the bag directory after reindexing completes.
This is a resource leak that could accumulate large amounts of disk space over time. The temporary files should be explicitly deleted after the bag_reader is closed, or the decompression should be directed to a true temporary directory with automatic cleanup.
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); |
There was a problem hiding this comment.
The PR description states that files are decompressed to "/tmp", but the actual implementation uses decompress_uri which decompresses files to the same directory as the source file (by removing the compression extension). This is a discrepancy between the PR description and the implementation.
The current behavior could cause issues:
- Files are left in the bag directory rather than a temporary location
- If the bag directory is read-only or has limited space, decompression will fail
- The decompressed files will persist after reindexing completes
Consider either updating the implementation to actually use a temporary directory, or updating the PR description to match the actual behavior.
|
|
||
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | ||
| if (!compression_factory_) { | ||
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | ||
| } | ||
|
|
||
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); | ||
|
|
There was a problem hiding this comment.
There's trailing whitespace at the end of line 188. This should be removed to maintain code cleanliness.
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | |
| if (!compression_factory_) { | |
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | |
| } | |
| // Decompress to /tmp | |
| auto decompressor = compression_factory_->create_decompressor("zstd"); | |
| uri_to_read = decompressor->decompress_uri(file_string); | |
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | |
| if (!compression_factory_) { | |
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | |
| } | |
| // Decompress to /tmp | |
| auto decompressor = compression_factory_->create_decompressor("zstd"); | |
| uri_to_read = decompressor->decompress_uri(file_string); |
| #include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp" | ||
| #include "rosbag2_cpp/visibility_control.hpp" | ||
|
|
||
| // Forward declaration due to circular dependency |
There was a problem hiding this comment.
The forward declaration comment mentions "circular dependency" but doesn't explain what the circular dependency is. Since rosbag2_cpp depends on rosbag2_compression (added in CMakeLists.txt), and this is a public header, the forward declaration is actually being used to avoid including the full header in a public API. The comment should be clarified to explain this is to minimize header dependencies in the public API, not to avoid a circular dependency.
| // Forward declaration due to circular dependency | |
| // Forward declaration to avoid including rosbag2_compression headers in this public API | |
| // and to minimize header dependencies. |
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | ||
| if (!compression_factory_) { | ||
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | ||
| } | ||
|
|
||
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); | ||
|
|
||
| if (metadata_.compression_format.empty()) { | ||
| metadata_.compression_format = "zstd"; | ||
| metadata_.compression_mode = "FILE"; | ||
| } | ||
| } |
There was a problem hiding this comment.
For large compressed bag files, decompressing each file synchronously could take a significant amount of time and disk I/O. The decompression creates a full uncompressed copy of each bag file, potentially doubling the disk space usage temporarily.
Consider adding:
- A progress indicator or logging to inform users about decompression progress
- A warning message if the bag directory doesn't have sufficient free space for decompression
- Documentation about the temporary disk space requirements for reindexing compressed bags
Additionally, consider whether there's a way to extract only the metadata from compressed files without fully decompressing them, which would be much more efficient.
| compression_factory_(std::move(compression_factory)) | ||
| { | ||
| regex_bag_pattern_ = R"(.+_(\d+)\.([a-zA-Z0-9])+)"; | ||
| regex_bag_pattern_ = R"(.+_(\d+)\.([a-zA-Z0-9]+)(?:\.([a-zA-Z0-9]+))?)"; |
There was a problem hiding this comment.
The updated regex pattern may not correctly match compressed bag files. The pattern R"(.+_(\d+)\.([a-zA-Z0-9]+)(?:\.([a-zA-Z0-9]+))?)" makes the compression extension optional via the non-capturing group with ?. However, this could incorrectly match files without compression extensions as having optional compression extensions.
Additionally, the pattern should more precisely match the expected file naming convention for compressed bags (e.g., bagname_0.db3.zstd). The current pattern doesn't enforce that the base extension is followed by a compression extension.
|
|
||
| if (metadata_.compression_format.empty()) { | ||
| metadata_.compression_format = "zstd"; | ||
| metadata_.compression_mode = "FILE"; |
There was a problem hiding this comment.
The compression mode is hardcoded to "FILE" as a string literal. This should use the proper enum value or constant. Looking at the compression system, there should be a function like compression_mode_to_string that converts the enum to string.
Additionally, the check if (metadata_.compression_format.empty()) means that only the first compressed file will set the compression format and mode. If different files in the bag have different compression formats, this will silently ignore them, which could lead to incorrect metadata.
| metadata_.compression_mode = "FILE"; | |
| metadata_.compression_mode = | |
| rosbag2_compression::compression_mode_to_string( | |
| rosbag2_compression::CompressionMode::FILE); | |
| } else if (metadata_.compression_format != "zstd") { | |
| ROSBAG2_CPP_LOG_WARN_STREAM( | |
| "Mixed compression formats detected while reindexing bag. " | |
| "Clearing bag-level compression metadata to avoid incorrect description."); | |
| metadata_.compression_format.clear(); | |
| metadata_.compression_mode.clear(); |
|
|
||
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); |
There was a problem hiding this comment.
If the decompress_uri call throws an exception (e.g., due to a corrupted compressed file or insufficient disk space), the reindexer will fail without a clear error message indicating which file caused the problem. Consider adding try-catch error handling around the decompression operation to provide better error messages that include the filename being processed.
| uri_to_read = decompressor->decompress_uri(file_string); | |
| try { | |
| uri_to_read = decompressor->decompress_uri(file_string); | |
| } catch (const std::exception & e) { | |
| ROSBAG2_CPP_LOG_ERROR_STREAM( | |
| "Failed to decompress bag file '" << file_string << | |
| "' for reindexing: " << e.what()); | |
| throw; | |
| } catch (...) { | |
| ROSBAG2_CPP_LOG_ERROR_STREAM( | |
| "Failed to decompress bag file '" << file_string << | |
| "' for reindexing due to an unknown error."); | |
| throw; | |
| } |
| if (file_string.size() > 5 && file_string.substr(file_string.size() - 5) == ".zstd") { | ||
| if (!compression_factory_) { | ||
| compression_factory_ = std::make_unique<rosbag2_compression::CompressionFactory>(); | ||
| } | ||
|
|
||
| // Decompress to /tmp | ||
| auto decompressor = compression_factory_->create_decompressor("zstd"); | ||
| uri_to_read = decompressor->decompress_uri(file_string); | ||
|
|
||
| if (metadata_.compression_format.empty()) { | ||
| metadata_.compression_format = "zstd"; | ||
| metadata_.compression_mode = "FILE"; | ||
| } | ||
| } |
There was a problem hiding this comment.
This PR adds support for reindexing compressed bags, which is a significant new feature, but there are no new test cases added to verify this functionality works correctly. The existing test in test_reindexer.cpp only tests uncompressed bags.
Consider adding test coverage for:
- Reindexing bags with zstd compression
- Verifying that temporary decompressed files are cleaned up
- Verifying that the compression metadata (format and mode) are correctly set
- Handling errors when decompression fails
The test could create a compressed bag, remove the metadata.yaml, and then verify that reindexing reconstructs the metadata correctly with proper compression information.
The reindexer used to fail for compressed files(.zstd) This fixes it by decompressing the files to /tmp before reindexing
Description
Fixes #990
Is this user-facing behavior change?
user can now reindex compressed files using rosbag
Did you use Generative AI?
Additional Information