From 2396f349f80e336d9640c3c9710e476e9b0b439b Mon Sep 17 00:00:00 2001 From: aditya Date: Wed, 14 Dec 2022 09:20:14 -0800 Subject: [PATCH 1/4] add close function and event publish on close --- rosbag2_cpp/include/rosbag2_cpp/writer.hpp | 1 + rosbag2_cpp/src/rosbag2_cpp/writer.cpp | 3 +++ rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 8 ++++++++ rosbag2_transport/include/rosbag2_transport/recorder.hpp | 4 ++++ rosbag2_transport/src/rosbag2_transport/recorder.cpp | 6 ++++++ 5 files changed, 22 insertions(+) diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index ba4ea2d48d..3d65554fdd 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -88,6 +88,7 @@ class ROSBAG2_CPP_PUBLIC Writer final const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options = ConverterOptions()); + void close(); /** * Create a new topic in the underlying storage. Needs to be called for every topic used within * a message which is passed to write(...). diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 3ee33b0f21..1a8d8c8088 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -63,6 +63,9 @@ void Writer::open( writer_impl_->open(storage_options, converter_options); } +void Writer::close() { + writer_impl_->close(); +} void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { std::lock_guard writer_lock(writer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 15bb690626..f20c75173c 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -30,6 +30,7 @@ #include "rosbag2_cpp/logging.hpp" #include "rosbag2_storage/storage_options.hpp" +#include namespace rosbag2_cpp { @@ -161,6 +162,13 @@ void SequentialWriter::close() metadata_io_->write_metadata(base_folder_, metadata_); } + if (storage_) { + auto info = std::make_shared(); + info->closed_file = storage_->get_relative_file_path(); + std::cout << "Closed file " << info->closed_file << std::endl; + callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); + } + storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory storage_factory_.reset(); } diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 3806c7b837..12a7c30062 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -97,6 +97,10 @@ class Recorder : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC void pause(); + /// Stop the recording. + ROSBAG2_TRANSPORT_PUBLIC + void stop(); + /// Resume recording. ROSBAG2_TRANSPORT_PUBLIC void resume(); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 6c736669fd..b60e7a9f07 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -124,6 +124,10 @@ Recorder::~Recorder() } } +void Recorder::stop() { + writer_->close(); +} + void Recorder::record() { topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; @@ -158,6 +162,7 @@ void Recorder::record() callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { { + RCLCPP_INFO(get_logger(), "Event publisher thread: Trigger"); std::lock_guard lock(event_publisher_thread_mutex_); bag_split_info_ = info; write_split_has_occurred_ = true; @@ -194,6 +199,7 @@ void Recorder::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; + RCLCPP_INFO(get_logger(), "Event publisher thread: Publish"); split_event_pub_->publish(message); } From 89a0a72776d871a307477bf74d7aef9544d7122e Mon Sep 17 00:00:00 2001 From: aditya Date: Wed, 14 Dec 2022 10:28:30 -0800 Subject: [PATCH 2/4] remove cout --- rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index f20c75173c..52fb5a078b 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -165,7 +165,6 @@ void SequentialWriter::close() if (storage_) { auto info = std::make_shared(); info->closed_file = storage_->get_relative_file_path(); - std::cout << "Closed file " << info->closed_file << std::endl; callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } From 0bc3baf755c0de4f5e11517d2f5ab3b8508856cb Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 18 Dec 2022 18:13:50 -0800 Subject: [PATCH 3/4] Move routines from Recorder's destructor to Recorder::stop() Signed-off-by: Michael Orlov --- .../src/rosbag2_transport/recorder.cpp | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index b60e7a9f07..d24ab95865 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -107,12 +107,18 @@ Recorder::Recorder( Recorder::~Recorder() { keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_); + stop(); +} + +void Recorder::stop() +{ stop_discovery_ = true; if (discovery_future_.valid()) { discovery_future_.wait(); } - + paused_ = true; subscriptions_.clear(); + writer_->close(); { std::lock_guard lock(event_publisher_thread_mutex_); @@ -124,12 +130,9 @@ Recorder::~Recorder() } } -void Recorder::stop() { - writer_->close(); -} - void Recorder::record() { + paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; if (record_options_.rmw_serialization_format.empty()) { throw std::runtime_error("No serialization format specified!"); From 0cfd7cc62dc28987bfd6f71d40d68361cdd03aa7 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Sun, 18 Dec 2022 18:15:35 -0800 Subject: [PATCH 4/4] Gracefully handle SIGTERM signal for rosbag2 Recorder Signed-off-by: Michael Orlov --- rosbag2_py/src/rosbag2_py/_transport.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..9dc3c5dd04 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -142,7 +142,8 @@ class Player class Recorder { private: - std::unique_ptr exec_; + static std::unique_ptr exec_; + static std::shared_ptr recorder_; public: Recorder() @@ -151,7 +152,8 @@ class Recorder exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + if (exec_) {exec_->cancel();} + if (recorder_) {recorder_->stop();} }); } @@ -160,7 +162,7 @@ class Recorder rclcpp::shutdown(); } - void record( + static void record( const rosbag2_storage::StorageOptions & storage_options, RecordOptions & record_options) { @@ -169,11 +171,11 @@ class Recorder } auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); - auto recorder = std::make_shared( + recorder_ = std::make_shared( std::move(writer), storage_options, record_options); - recorder->record(); + recorder_->record(); - exec_->add_node(recorder); + exec_->add_node(recorder_); // Release the GIL for long-running record, so that calling Python code can use other threads { py::gil_scoped_release release; @@ -181,7 +183,7 @@ class Recorder } } - void cancel() + static void cancel() { exec_->cancel(); }