diff --git a/conanfile.py b/conanfile.py index 3d6747b74..dc04a0856 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.2.2" + version = "7.3.0" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 6f92f3a65..58688f341 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -404,6 +404,10 @@ class ReplDevListener { virtual void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out, const replica_member_info& member_in, trace_id_t tid) = 0; + /// @brief Called when clean replace member task (rollback). + virtual void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, + const replica_member_info& member_in, trace_id_t tid) = 0; + /// @brief Called when remove a member. virtual void on_remove_member(const replica_id_t& member, trace_id_t tid) = 0; @@ -553,6 +557,10 @@ class ReplDev { /// this API can return empty result. virtual std::vector< peer_info > get_replication_status() const = 0; + /// @brief Get all members in the replication quorum + /// @return List of replica IDs that are part of the replication group + virtual std::vector< replica_id_t > get_replication_quorum() = 0; + /// @brief Gets the group_id this repldev is working for /// @return group_id virtual group_id_t group_id() const = 0; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index b1b97d828..63b052b31 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -24,6 +24,8 @@ #include "fetch_data_rpc_generated.h" #include +#include + namespace homestore { std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1}; @@ -1853,16 +1855,50 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) { auto task_id = std::string(r_cast< const char* >(rreq->header().cbytes())); RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task commit, task_id={}", task_id); - std::unique_lock lg{m_sb_mtx}; - auto persisted_task_id = get_replace_member_task_id(); - if (!persisted_task_id.empty()) { - RD_DBG_ASSERT(persisted_task_id == task_id, - "Invalid task_id in clean_replace_member_task message, received {}, persisted {}", task_id, - persisted_task_id); - m_rd_sb->replace_member_task = replace_member_task_superblk{}; - m_rd_sb.write(); + replica_member_info member_out; + replica_member_info member_in; + + // Step 1: Check and read member info from superblk + { + std::unique_lock lg{m_sb_mtx}; + auto persisted_task_id = get_replace_member_task_id(); + if (persisted_task_id.empty()) { + RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task: task not found, task_id={}", task_id); + return; + } + + if (persisted_task_id != task_id) { + RD_LOGW(rreq->traceID(), + "Raft repl clean_replace_member_task: task_id mismatch, received={}, persisted={}, skip cleaning", + task_id, persisted_task_id); + return; + } + + // Read member info from superblk + member_out.id = m_rd_sb->replace_member_task.replica_out; + member_in.id = m_rd_sb->replace_member_task.replica_in; + } + + // Step 2: Call listener callback to rollback membership in HomeObject + if (member_out.id != boost::uuids::nil_uuid() && member_in.id != boost::uuids::nil_uuid()) { + RD_LOGI(rreq->traceID(), + "Raft repl clean_replace_member_task, callback to listener, task_id={}, member_out={}, member_in={}", + task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); + m_listener->on_clean_replace_member_task(task_id, member_out, member_in, rreq->traceID()); + } else { + RD_LOGW(rreq->traceID(), "Raft repl clean_replace_member_task: invalid member info, skip callback"); + } + + // Step 3: Clear the replace_member task from superblk + { + std::unique_lock lg{m_sb_mtx}; + auto persisted_task_id = get_replace_member_task_id(); + if (!persisted_task_id.empty()) { + m_rd_sb->replace_member_task = replace_member_task_superblk{}; + m_rd_sb.write(); + RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id); + } } - RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id); } void RaftReplDev::update_truncation_boundary(repl_lsn_t truncation_upper_limit) { @@ -1980,6 +2016,24 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const { return pi; } +std::vector< replica_id_t > RaftReplDev::get_replication_quorum() { + std::vector< replica_id_t > member_ids; + auto msg_service = group_msg_service(); + + if (msg_service) { + std::list< nuraft_mesg::replica_config > cluster_config; + msg_service->get_cluster_config(cluster_config); + for (auto const& config : cluster_config) { + member_ids.push_back(boost::uuids::string_generator()(config.peer_id)); + } + RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in cluster config", member_ids.size()); + } else { + RD_LOGW(NO_TRACE_ID, "get_replication_quorum: msg_service is null, returning empty member list"); + } + + return member_ids; +} + void RaftReplDev::reconcile_leader() { int32_t my_priority = raft_server()->get_srv_config(m_raft_server_id)->get_priority(); if (!is_leader()) { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index ada14203c..d97d44cba 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -290,6 +290,7 @@ class RaftReplDev : public ReplDev, bool is_leader() const override; replica_id_t get_leader_id() const override; std::vector< peer_info > get_replication_status() const override; + std::vector< replica_id_t > get_replication_quorum() override; std::set< replica_id_t > get_active_peers() const; group_id_t group_id() const override { return m_group_id; } void reconcile_leader() override; diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index b4698d046..4c9fcced2 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -78,6 +78,9 @@ class SoloReplDev : public ReplDev { return std::vector< peer_info >{ peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0, .priority_ = 1}}; } + std::vector< replica_id_t > get_replication_quorum() override { + return std::vector< replica_id_t >{m_group_id}; + } void reconcile_leader() override {} void yield_leadership(bool immediate_yield, replica_id_t candidate) override {} bool is_ready_for_traffic() const override { return true; } diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 88c4eaf0a..3c313c452 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -374,6 +374,12 @@ class TestReplicatedDB : public homestore::ReplDevListener { boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); } + void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, + const replica_member_info& member_in, trace_id_t tid) override { + LOGINFO("[Replica={}] clean replace member task {} out {} in {}", g_helper->replica_num(), task_id, + boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id)); + } + void on_remove_member(const replica_id_t& member, trace_id_t tid) override { LOGINFO("[Replica={}] remove member, member {}", g_helper->replica_num(), member); } diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index d87c7eb71..9b64b6f24 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -137,6 +137,8 @@ class SoloReplDevTest : public testing::Test { const replica_member_info& member_in, trace_id_t tid) override {} void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out, const replica_member_info& member_in, trace_id_t tid) override {} + void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out, + const replica_member_info& member_in, trace_id_t tid) override {} void on_remove_member(const replica_id_t& member, trace_id_t tid) override {} void on_destroy(const group_id_t& group_id) override {} void notify_committed_lsn(int64_t lsn) override {}