diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index ba4ea2d48d..3d65554fdd 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -88,6 +88,7 @@ class ROSBAG2_CPP_PUBLIC Writer final const rosbag2_storage::StorageOptions & storage_options, const ConverterOptions & converter_options = ConverterOptions()); + void close(); /** * Create a new topic in the underlying storage. Needs to be called for every topic used within * a message which is passed to write(...). diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 3ee33b0f21..1a8d8c8088 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -63,6 +63,9 @@ void Writer::open( writer_impl_->open(storage_options, converter_options); } +void Writer::close() { + writer_impl_->close(); +} void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) { std::lock_guard writer_lock(writer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 15bb690626..52fb5a078b 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -30,6 +30,7 @@ #include "rosbag2_cpp/logging.hpp" #include "rosbag2_storage/storage_options.hpp" +#include namespace rosbag2_cpp { @@ -161,6 +162,12 @@ void SequentialWriter::close() metadata_io_->write_metadata(base_folder_, metadata_); } + if (storage_) { + auto info = std::make_shared(); + info->closed_file = storage_->get_relative_file_path(); + callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); + } + storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory storage_factory_.reset(); } diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..9dc3c5dd04 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -142,7 +142,8 @@ class Player class Recorder { private: - std::unique_ptr exec_; + static std::unique_ptr exec_; + static std::shared_ptr recorder_; public: Recorder() @@ -151,7 +152,8 @@ class Recorder exec_ = std::make_unique(); std::signal( SIGTERM, [](int /* signal */) { - rclcpp::shutdown(); + if (exec_) {exec_->cancel();} + if (recorder_) {recorder_->stop();} }); } @@ -160,7 +162,7 @@ class Recorder rclcpp::shutdown(); } - void record( + static void record( const rosbag2_storage::StorageOptions & storage_options, RecordOptions & record_options) { @@ -169,11 +171,11 @@ class Recorder } auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options); - auto recorder = std::make_shared( + recorder_ = std::make_shared( std::move(writer), storage_options, record_options); - recorder->record(); + recorder_->record(); - exec_->add_node(recorder); + exec_->add_node(recorder_); // Release the GIL for long-running record, so that calling Python code can use other threads { py::gil_scoped_release release; @@ -181,7 +183,7 @@ class Recorder } } - void cancel() + static void cancel() { exec_->cancel(); } diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 3806c7b837..12a7c30062 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -97,6 +97,10 @@ class Recorder : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC void pause(); + /// Stop the recording. + ROSBAG2_TRANSPORT_PUBLIC + void stop(); + /// Resume recording. ROSBAG2_TRANSPORT_PUBLIC void resume(); diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 6c736669fd..d24ab95865 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -107,12 +107,18 @@ Recorder::Recorder( Recorder::~Recorder() { keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_); + stop(); +} + +void Recorder::stop() +{ stop_discovery_ = true; if (discovery_future_.valid()) { discovery_future_.wait(); } - + paused_ = true; subscriptions_.clear(); + writer_->close(); { std::lock_guard lock(event_publisher_thread_mutex_); @@ -126,6 +132,7 @@ Recorder::~Recorder() void Recorder::record() { + paused_ = record_options_.start_paused; topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides; if (record_options_.rmw_serialization_format.empty()) { throw std::runtime_error("No serialization format specified!"); @@ -158,6 +165,7 @@ void Recorder::record() callbacks.write_split_callback = [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { { + RCLCPP_INFO(get_logger(), "Event publisher thread: Trigger"); std::lock_guard lock(event_publisher_thread_mutex_); bag_split_info_ = info; write_split_has_occurred_ = true; @@ -194,6 +202,7 @@ void Recorder::event_publisher_thread_main() auto message = rosbag2_interfaces::msg::WriteSplitEvent(); message.closed_file = bag_split_info_.closed_file; message.opened_file = bag_split_info_.opened_file; + RCLCPP_INFO(get_logger(), "Event publisher thread: Publish"); split_event_pub_->publish(message); }