Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.1.2"
version = "7.1.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
12 changes: 11 additions & 1 deletion src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*********************************************************************************/
#include <sisl/logging/logging.h>
#include <iomgr/io_environment.hpp>
#include <iomgr/iomgr_flip.hpp>
#include <chrono>

#include <boost/uuid/string_generator.hpp>
Expand Down Expand Up @@ -655,7 +656,16 @@ void RaftReplService::start_repl_service_timers() {

m_replace_member_sync_check_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, nullptr,
iomgr::reactor_regex::all_worker, [this](void*) { monitor_replace_member_replication_status(); },
iomgr::reactor_regex::all_worker,
[this](void*) {
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("skip_monitor_replace_member_replication_status")) {
LOGINFOMOD(replication, "flip skip_monitor_replace_member_replication_status triggered");
return;
}
#endif
monitor_replace_member_replication_status();
},
true /* wait_to_schedule */);
}

Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class HSTestHelper {
freq.set_count(count);
freq.set_percent(percent);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
LOGDEBUG("Flip {} set", flip_name);
LOGINFO("Flip {} set", flip_name);
}

void set_delay_flip(const std::string flip_name, uint64_t delay_usec, uint32_t count = 1, uint32_t percent = 100) {
Expand Down
51 changes: 38 additions & 13 deletions src/tests/test_raft_repl_dev_dynamic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ class ReplDevDynamicTest : public RaftReplDevTestBase {
}
};

#ifdef _PRERELEASE
TEST_F(ReplDevDynamicTest, ReplaceMember) {
LOGINFO("ReplaceMember test started replica={}", g_helper->replica_num());
g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
// Write some IO's, replace a member, validate all members data except which is out.
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
Expand Down Expand Up @@ -66,6 +68,11 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) {
replace_member(db, new_task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in), 0,
ReplServiceError::REPLACE_MEMBER_TASK_MISMATCH);
});
// If the manual monitor_replace_member_replication_status fails, restore the periodical check.
// restore the periodical check.
LOGINFO("restore monitor_replace_member_replication_status")
g_helper->remove_flip("skip_monitor_replace_member_replication_status");

if (is_replica_num_in({0, 1, member_in})) {
// Skip the member which is going to be replaced. Validate data on all other replica's.
LOGINFO("Validate all data written so far by reading them replica={}", g_helper->replica_num());
Expand Down Expand Up @@ -96,10 +103,11 @@ TEST_F(ReplDevDynamicTest, ReplaceMember) {
LOGINFO("ReplaceMember test done replica={}", g_helper->replica_num());
}

// After replace member is in progress, rollback replace member operation before complete_replace_member is
// called(triggered after 60s).
// After replace member is in progress, rollback replace member operation(complete_replace_member will be disabled)
TEST_F(ReplDevDynamicTest, ReplaceMemberRollback) {
LOGINFO("ReplaceMember test started replica={}", g_helper->replica_num());
// don't execute complete_replace_member in the background reaper thread within this test.
g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
// Write some IO's, replace a member, validate all members data except which is out.
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
Expand All @@ -125,6 +133,12 @@ TEST_F(ReplDevDynamicTest, ReplaceMemberRollback) {
LOGDEBUG("Not added to group yet")
std::this_thread::sleep_for(std::chrono::microseconds(300));
}
// Need to wait for log being caught up. There is a known issue that if the removed member can not catch up
// within 5*HB since respond leave_cluster_request, it has no chance to detect itself removed from the group.
// Unlike other tests, what we remove is a normal member who has all logs, here we remove new member which is
// very likely to be behind.
wait_for_commits(num_io_entries);
LOGINFO("Member in got all commits");
}

g_helper->sync_for_verify_start(num_members);
Expand All @@ -141,9 +155,8 @@ TEST_F(ReplDevDynamicTest, ReplaceMemberRollback) {

g_helper->sync_for_verify_start(num_members);
LOGINFO("rollback triggered, sync_for_verify_start replica={} ", g_helper->replica_num());
// verify member_in is removed from group.

if (g_helper->replica_num() == member_in) {
// The member_in will have the repl dev destroyed.
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
while (repl_dev && !repl_dev->is_destroyed()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
Expand All @@ -164,12 +177,13 @@ TEST_F(ReplDevDynamicTest, ReplaceMemberRollback) {
}

g_helper->sync_for_cleanup_start(num_members);
g_helper->remove_flip("skip_monitor_replace_member_replication_status");
LOGINFO("ReplaceMember test done replica={}", g_helper->replica_num());
}

TEST_F(ReplDevDynamicTest, TwoMemberDown) {
LOGINFO("TwoMemberDown test started replica={}", g_helper->replica_num());

g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
// Make two members down in a group and leader cant reach a quorum.
// We set the custom quorum size to 1 and call replace member.
// Leader should do some writes to validate it has reach quorum size.
Expand Down Expand Up @@ -238,6 +252,7 @@ TEST_F(ReplDevDynamicTest, TwoMemberDown) {
}

g_helper->sync_for_cleanup_start(num_members);
g_helper->remove_flip("skip_monitor_replace_member_replication_status");
LOGINFO("TwoMemberDown test done replica={}", g_helper->replica_num());
}

Expand All @@ -246,6 +261,7 @@ TEST_F(ReplDevDynamicTest, OutMemberDown) {
// replica0 should be able to baseline resync to replica4(new member).
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("OutMemberDown test started replica={}", g_helper->replica_num());
g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
Expand Down Expand Up @@ -298,6 +314,8 @@ TEST_F(ReplDevDynamicTest, OutMemberDown) {
check_replace_member_status(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in)),
ReplaceMemberStatus::IN_PROGRESS);
});
LOGINFO("restore monitor_replace_member_replication_status")
g_helper->remove_flip("skip_monitor_replace_member_replication_status");
// Since the out_member stopped, it cannot response to remove_srv req, as a result the first time will get CANCELLED
// error, so waiting time is longer than other tests.
if (g_helper->replica_num() == 2) {
Expand All @@ -317,14 +335,12 @@ TEST_F(ReplDevDynamicTest, OutMemberDown) {
g_helper->sync_for_test_start(num_members);
if (g_helper->replica_num() != 2) {
this->run_on_leader(db, [this, db, &task_id, member_out, member_in] {
auto status = check_replace_member_status(db, task_id, g_helper->replica_id(member_out),
g_helper->replica_id(member_in));
// out_member is down, so it can not response to remove req. Based on nuraft logic, leader will wait for
// timeout and remove it automatically. Simulate next complete_replace_member retry.
if (status == ReplaceMemberStatus::IN_PROGRESS) {
auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service());
raft_repl_svc.monitor_replace_member_replication_status();
LOGINFO("Simulate reaper thread to complete_replace_member");
while (check_replace_member_status(
db, task_id, g_helper->replica_id(member_out),
// out_member is down, so it can not response to remove req. Based on nuraft logic, leader will
// wait for timeout and remove it automatically. Simulate next complete_replace_member retry.
g_helper->replica_id(member_in)) == ReplaceMemberStatus::IN_PROGRESS) {
LOGINFO("wait for reaper thread to complete_replace_member");
std::this_thread::sleep_for(std::chrono::seconds(1));
}
ASSERT_EQ(check_replace_member_status(db, task_id, g_helper->replica_id(member_out),
Expand All @@ -337,6 +353,7 @@ TEST_F(ReplDevDynamicTest, OutMemberDown) {
}

TEST_F(ReplDevDynamicTest, LeaderReplace) {
g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
// replica0(leader) and replica1 and replica2 is up. Replace replica0(leader) with replica3.
// replica0 will yield leadership and any other replica will be come leader and leader
// will do baseline resync to replica4(new member).
Expand Down Expand Up @@ -390,6 +407,9 @@ TEST_F(ReplDevDynamicTest, LeaderReplace) {
check_replace_member_status(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in)),
ReplaceMemberStatus::IN_PROGRESS);
});
// restore the periodical check.
LOGINFO("restore monitor_replace_member_replication_status")
g_helper->remove_flip("skip_monitor_replace_member_replication_status");
if (g_helper->replica_num() == member_out) {
// The out member will have the repl dev destroyed.
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
Expand Down Expand Up @@ -417,6 +437,7 @@ TEST_F(ReplDevDynamicTest, OneMemberRestart) {
// replica0 should be able to baseline resync to replica4(new member).
// Write some IO's, replace a member, validate all members data except which is out.
LOGINFO("OneMemberRestart test started replica={}", g_helper->replica_num());
g_helper->set_basic_flip("skip_monitor_replace_member_replication_status", 1000);
auto db = dbs_.back();
auto num_replicas = SISL_OPTIONS["replicas"].as< uint32_t >();
auto num_members = SISL_OPTIONS["replicas"].as< uint32_t >() + SISL_OPTIONS["spare_replicas"].as< uint32_t >();
Expand Down Expand Up @@ -459,6 +480,9 @@ TEST_F(ReplDevDynamicTest, OneMemberRestart) {
check_replace_member_status(db, task_id, g_helper->replica_id(member_out), g_helper->replica_id(member_in)),
ReplaceMemberStatus::IN_PROGRESS);
});
LOGINFO("restore monitor_replace_member_replication_status")
g_helper->remove_flip("skip_monitor_replace_member_replication_status");

if (g_helper->replica_num() == member_out) {
// The out member will have the repl dev destroyed.
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
Expand All @@ -479,6 +503,7 @@ TEST_F(ReplDevDynamicTest, OneMemberRestart) {
});
LOGINFO("OneMemberRestart test done replica={}", g_helper->replica_num());
}
#endif

TEST_F(ReplDevDynamicTest, ValidateRequest) {
LOGINFO("ValidateRequest test started replica={}", g_helper->replica_num());
Expand Down
Loading