From 431ac5d98bf56b40f1dc8dbf7eba5b8724dba09a Mon Sep 17 00:00:00 2001 From: yawzhang Date: Wed, 25 Jun 2025 15:20:32 +0800 Subject: [PATCH] Avoid dup-append logs for committing logs. This change avoid the following corner case: T1: lsn=100 is executing handle_commit T2: lsn=100 is requested to append from leader again, triggering raft_event and retrieving the existed rreq from map T3: lsn=100 committed, the rreq is removed from map and is cleared T4: during the raft_event process, the rreq has already been cleared, preventing further processing --- conanfile.py | 2 +- .../replication/repl_dev/raft_repl_dev.cpp | 29 ++++++++++--------- src/tests/test_common/raft_repl_test_base.hpp | 9 +++++- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/conanfile.py b/conanfile.py index 0c1d6fbed..d3e8550a4 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.17.4" + version = "6.17.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 04248a3b5..4e0d4e356 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1423,13 +1423,6 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), {rreq->local_blkid()}, rreq); } - if (!recovery) { - auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn()); - RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn, - "Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}", - rreq->lsn(), prev_lsn); - } - // Remove the request from repl_key map only after the listener operation is completed. // This prevents unnecessary block allocation in the following scenario: // 1. The follower processes a commit for LSN 100 and remove rreq from rep_key map before listener commit @@ -1440,6 +1433,13 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { m_repl_key_req_map.erase(rreq->rkey()); // Remove the request from lsn map. m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq); + + if (!recovery) { + auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn()); + RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn, + "Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}", + rreq->lsn(), prev_lsn); + } if (!rreq->is_proposer()) rreq->clear(); } @@ -1878,18 +1878,21 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type, m_commit_upto_lsn.load(), raft_req->get_commit_idx()); auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc(); - auto last_commit_lsn = uint64_cast(get_last_commit_lsn()); + auto local_start_lsn = uint64_cast(m_data_journal->start_index()); + auto local_last_lsn = uint64_cast(m_data_journal->next_slot() - 1); + // set term=0 when there is no log entry, which will happen when SM first boot or after Baseline Re-sync + auto local_last_term = local_start_lsn > local_last_lsn ? 0 : m_data_journal->term_at(local_last_lsn); for (unsigned long i = 0; i < entries.size(); i++) { auto& entry = entries[i]; auto lsn = start_lsn + i; auto term = entry->get_term(); if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; } if (entry->get_buf_ptr()->size() == 0) { continue; } - // skipping localize for already committed log(dup), they anyway will be discard - // by nuraft before append_log. - if (lsn <= last_commit_lsn) { - RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn, - last_commit_lsn); + // skipping localize for already appended log with the same term(dup), they anyway will be discard + // or rollback by nuraft before append_log, this way can avoid dup-append for committing logs + if (lsn <= local_last_lsn && term == local_last_term) { + RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, local_last_lsn {}, local_last_term {}", + term, lsn, local_last_lsn, local_last_term); continue; } // Those LSNs already in logstore but not yet committed, will be dedup here, diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 934256594..b70b1ab7b 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -614,7 +614,14 @@ class RaftReplDevTestBase : public testing::Test { data_size == nullptr ? std::abs(std::lround(num_blks_gen(g_re))) * block_size : *data_size; this->generate_writes(size, block_size, db); }); - if (wait_for_commit) { g_helper->runner().execute().get(); } + if (wait_for_commit) { + g_helper->runner().execute().get(); + // wait for related rreqs being removed from map. this way to avoid rreqs reused in this case: + // 1. follower committing rreq + // 2. follower received a duplicated append log entries from leader, then get the rreq from map + // 3. follower finished commit, clear rreq, then the append thread hold an empty rreq. + std::this_thread::sleep_for(std::chrono::seconds{1}); + } break; } else { LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries,