Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ROSBAG2_CPP_PUBLIC Writer final
const rosbag2_storage::StorageOptions & storage_options,
const ConverterOptions & converter_options = ConverterOptions());

void close();
/**
* Create a new topic in the underlying storage. Needs to be called for every topic used within
* a message which is passed to write(...).
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ void Writer::open(
writer_impl_->open(storage_options, converter_options);
}

void Writer::close() {
writer_impl_->close();
}
void Writer::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
{
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
Expand Down
7 changes: 7 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "rosbag2_cpp/logging.hpp"

#include "rosbag2_storage/storage_options.hpp"
#include <iostream>

namespace rosbag2_cpp
{
Expand Down Expand Up @@ -161,6 +162,12 @@ void SequentialWriter::close()
metadata_io_->write_metadata(base_folder_, metadata_);
}

if (storage_) {
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = storage_->get_relative_file_path();
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
}

storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
storage_factory_.reset();
}
Expand Down
16 changes: 9 additions & 7 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ class Player
class Recorder
{
private:
std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
static std::unique_ptr<rclcpp::executors::SingleThreadedExecutor> exec_;
static std::shared_ptr<rosbag2_transport::Recorder> recorder_;

public:
Recorder()
Expand All @@ -151,7 +152,8 @@ class Recorder
exec_ = std::make_unique<rclcpp::executors::SingleThreadedExecutor>();
std::signal(
SIGTERM, [](int /* signal */) {
rclcpp::shutdown();
if (exec_) {exec_->cancel();}
if (recorder_) {recorder_->stop();}
});
}

Expand All @@ -160,7 +162,7 @@ class Recorder
rclcpp::shutdown();
}

void record(
static void record(
const rosbag2_storage::StorageOptions & storage_options,
RecordOptions & record_options)
{
Expand All @@ -169,19 +171,19 @@ class Recorder
}

auto writer = rosbag2_transport::ReaderWriterFactory::make_writer(record_options);
auto recorder = std::make_shared<rosbag2_transport::Recorder>(
recorder_ = std::make_shared<rosbag2_transport::Recorder>(
std::move(writer), storage_options, record_options);
recorder->record();
recorder_->record();

exec_->add_node(recorder);
exec_->add_node(recorder_);
// Release the GIL for long-running record, so that calling Python code can use other threads
{
py::gil_scoped_release release;
exec_->spin();
}
}

void cancel()
static void cancel()
{
exec_->cancel();
}
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ class Recorder : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
void pause();

/// Stop the recording.
ROSBAG2_TRANSPORT_PUBLIC
void stop();

/// Resume recording.
ROSBAG2_TRANSPORT_PUBLIC
void resume();
Expand Down
11 changes: 10 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,18 @@ Recorder::Recorder(
Recorder::~Recorder()
{
keyboard_handler_->delete_key_press_callback(toggle_paused_key_callback_handle_);
stop();
}

void Recorder::stop()
{
stop_discovery_ = true;
if (discovery_future_.valid()) {
discovery_future_.wait();
}

paused_ = true;
subscriptions_.clear();
writer_->close();

{
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_);
Expand All @@ -126,6 +132,7 @@ Recorder::~Recorder()

void Recorder::record()
{
paused_ = record_options_.start_paused;
topic_qos_profile_overrides_ = record_options_.topic_qos_profile_overrides;
if (record_options_.rmw_serialization_format.empty()) {
throw std::runtime_error("No serialization format specified!");
Expand Down Expand Up @@ -158,6 +165,7 @@ void Recorder::record()
callbacks.write_split_callback =
[this](rosbag2_cpp::bag_events::BagSplitInfo & info) {
{
RCLCPP_INFO(get_logger(), "Event publisher thread: Trigger");
std::lock_guard<std::mutex> lock(event_publisher_thread_mutex_);
bag_split_info_ = info;
write_split_has_occurred_ = true;
Expand Down Expand Up @@ -194,6 +202,7 @@ void Recorder::event_publisher_thread_main()
auto message = rosbag2_interfaces::msg::WriteSplitEvent();
message.closed_file = bag_split_info_.closed_file;
message.opened_file = bag_split_info_.opened_file;
RCLCPP_INFO(get_logger(), "Event publisher thread: Publish");
split_event_pub_->publish(message);
}

Expand Down