Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.2.2"
version = "7.3.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
8 changes: 8 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,10 @@ class ReplDevListener {
virtual void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) = 0;

/// @brief Called when clean replace member task (rollback).
virtual void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) = 0;

/// @brief Called when remove a member.
virtual void on_remove_member(const replica_id_t& member, trace_id_t tid) = 0;

Expand Down Expand Up @@ -553,6 +557,10 @@ class ReplDev {
/// this API can return empty result.
virtual std::vector< peer_info > get_replication_status() const = 0;

/// @brief Get all members in the replication quorum
/// @return List of replica IDs that are part of the replication group
virtual std::vector< replica_id_t > get_replication_quorum() = 0;

/// @brief Gets the group_id this repldev is working for
/// @return group_id
virtual group_id_t group_id() const = 0;
Expand Down
72 changes: 63 additions & 9 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "fetch_data_rpc_generated.h"
#include <nuraft_mesg/common.hpp>

#include <boost/uuid/string_generator.hpp>

namespace homestore {
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};

Expand Down Expand Up @@ -1853,16 +1855,50 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) {
auto task_id = std::string(r_cast< const char* >(rreq->header().cbytes()));
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task commit, task_id={}", task_id);

std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (!persisted_task_id.empty()) {
RD_DBG_ASSERT(persisted_task_id == task_id,
"Invalid task_id in clean_replace_member_task message, received {}, persisted {}", task_id,
persisted_task_id);
m_rd_sb->replace_member_task = replace_member_task_superblk{};
m_rd_sb.write();
replica_member_info member_out;
replica_member_info member_in;

// Step 1: Check and read member info from superblk
{
std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (persisted_task_id.empty()) {
RD_LOGI(rreq->traceID(), "Raft repl clean_replace_member_task: task not found, task_id={}", task_id);
return;
}

if (persisted_task_id != task_id) {
RD_LOGW(rreq->traceID(),
"Raft repl clean_replace_member_task: task_id mismatch, received={}, persisted={}, skip cleaning",
task_id, persisted_task_id);
return;
}

// Read member info from superblk
member_out.id = m_rd_sb->replace_member_task.replica_out;
member_in.id = m_rd_sb->replace_member_task.replica_in;
}

// Step 2: Call listener callback to rollback membership in HomeObject
if (member_out.id != boost::uuids::nil_uuid() && member_in.id != boost::uuids::nil_uuid()) {
RD_LOGI(rreq->traceID(),
"Raft repl clean_replace_member_task, callback to listener, task_id={}, member_out={}, member_in={}",
task_id, boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
m_listener->on_clean_replace_member_task(task_id, member_out, member_in, rreq->traceID());
} else {
RD_LOGW(rreq->traceID(), "Raft repl clean_replace_member_task: invalid member info, skip callback");
}

// Step 3: Clear the replace_member task from superblk
{
std::unique_lock lg{m_sb_mtx};
auto persisted_task_id = get_replace_member_task_id();
if (!persisted_task_id.empty()) {
m_rd_sb->replace_member_task = replace_member_task_superblk{};
m_rd_sb.write();
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
}
}
RD_LOGI(rreq->traceID(), "Raft repl replace_member_task has been cleared, task_id={}", task_id);
}

void RaftReplDev::update_truncation_boundary(repl_lsn_t truncation_upper_limit) {
Expand Down Expand Up @@ -1980,6 +2016,24 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {
return pi;
}

std::vector< replica_id_t > RaftReplDev::get_replication_quorum() {
std::vector< replica_id_t > member_ids;
auto msg_service = group_msg_service();

if (msg_service) {
std::list< nuraft_mesg::replica_config > cluster_config;
msg_service->get_cluster_config(cluster_config);
for (auto const& config : cluster_config) {
member_ids.push_back(boost::uuids::string_generator()(config.peer_id));
}
RD_LOGD(NO_TRACE_ID, "get_replication_quorum: found {} members in cluster config", member_ids.size());
} else {
RD_LOGW(NO_TRACE_ID, "get_replication_quorum: msg_service is null, returning empty member list");
}

return member_ids;
}

void RaftReplDev::reconcile_leader() {
int32_t my_priority = raft_server()->get_srv_config(m_raft_server_id)->get_priority();
if (!is_leader()) {
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class RaftReplDev : public ReplDev,
bool is_leader() const override;
replica_id_t get_leader_id() const override;
std::vector< peer_info > get_replication_status() const override;
std::vector< replica_id_t > get_replication_quorum() override;
std::set< replica_id_t > get_active_peers() const;
group_id_t group_id() const override { return m_group_id; }
void reconcile_leader() override;
Expand Down
3 changes: 3 additions & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ class SoloReplDev : public ReplDev {
return std::vector< peer_info >{
peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0, .priority_ = 1}};
}
std::vector< replica_id_t > get_replication_quorum() override {
return std::vector< replica_id_t >{m_group_id};
}
void reconcile_leader() override {}
void yield_leadership(bool immediate_yield, replica_id_t candidate) override {}
bool is_ready_for_traffic() const override { return true; }
Expand Down
6 changes: 6 additions & 0 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ class TestReplicatedDB : public homestore::ReplDevListener {
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
}

void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {
LOGINFO("[Replica={}] clean replace member task {} out {} in {}", g_helper->replica_num(), task_id,
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
}

void on_remove_member(const replica_id_t& member, trace_id_t tid) override {
LOGINFO("[Replica={}] remove member, member {}", g_helper->replica_num(), member);
}
Expand Down
2 changes: 2 additions & 0 deletions src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class SoloReplDevTest : public testing::Test {
const replica_member_info& member_in, trace_id_t tid) override {}
void on_complete_replace_member(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {}
void on_clean_replace_member_task(const std::string& task_id, const replica_member_info& member_out,
const replica_member_info& member_in, trace_id_t tid) override {}
void on_remove_member(const replica_id_t& member, trace_id_t tid) override {}
void on_destroy(const group_id_t& group_id) override {}
void notify_committed_lsn(int64_t lsn) override {}
Expand Down
Loading