From cae1eff97fcbf87b73aed105a1da2bd0f2d8a3fd Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 19:36:07 -0500 Subject: [PATCH 01/11] VS project fix --- Builds/VisualStudio/stellar-core.vcxproj | 12 ++++---- .../VisualStudio/stellar-core.vcxproj.filters | 28 ++++++++++++++++++- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj index 0aa028f9d2..4fdc05cc75 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj +++ b/Builds/VisualStudio/stellar-core.vcxproj @@ -144,7 +144,7 @@ - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next @@ -207,7 +207,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug curr + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug curr @@ -273,7 +273,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next @@ -337,7 +337,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release next @@ -400,7 +400,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release curr + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release curr @@ -577,6 +577,7 @@ exit /b 0 + @@ -1045,6 +1046,7 @@ exit /b 0 + diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters index 7782662115..26d095e903 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj.filters +++ b/Builds/VisualStudio/stellar-core.vcxproj.filters @@ -1323,7 +1323,6 @@ ledger - main @@ -1423,6 +1422,21 @@ invariant + + invariant + + + ledger + + + util + + + util + + + lib\tracy + @@ -2524,6 +2538,18 @@ invariant + + invariant + + + ledger + + + util + + + util + From b86a02a97b7ae5f6436ff0428a198feca474cb29 Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 17:10:44 -0500 Subject: [PATCH 02/11] Enhance parallel tx set building benchmark with 3 new scenarios Add 3 new benchmark scenarios to improve coverage: - Low conflict (0.1 conflicts/tx, 5 RO, 1 RW) - Medium conflict (0.5 conflicts/tx, 2 RO, 2 RW) - Dense RW-only (5 conflicts/tx, 0 RO, 3 RW) Also add structured output header for easier comparison. --- src/herder/test/TxSetTests.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/herder/test/TxSetTests.cpp b/src/herder/test/TxSetTests.cpp index 376bfef3a1..888bae8e00 100644 --- a/src/herder/test/TxSetTests.cpp +++ b/src/herder/test/TxSetTests.cpp @@ -3321,12 +3321,31 @@ TEST_CASE("parallel tx set building benchmark", << ", mean duration: " << 1e-6 * totalDuration / iterCount << " ms" << std::endl; }; + std::cout << "=== Parallel Tx Set Building Benchmark ===" << std::endl; + std::cout << "TX_COUNT=" << MEAN_INCLUDED_TX_COUNT * TX_COUNT_MEMPOOL_MULTIPLIER + << " CLUSTER_COUNT=" << CLUSTER_COUNT + << " STAGES=" << MIN_STAGE_COUNT << "-" << MAX_STAGE_COUNT + << std::endl; + std::cout << "---" << std::endl; + // Fully independent (no conflicts) - stresses bin packing & cluster scan runBenchmark(0, 0, 0); + // Very sparse conflicts - mostly independent fast path + runBenchmark(0.1, 5, 1); + // Mix of independent and small conflict clusters + runBenchmark(0.5, 2, 2); + // Rare conflicts with large RO fan-out runBenchmark(1, 1000, 1); + // RW-only small conflict groups + runBenchmark(5, 0, 3); + // Moderate conflicts, large RO fan-out runBenchmark(10, 40, 1); + // Heavy conflicts, large RO fan-out runBenchmark(20, 40, 1); + // Moderate balanced RO/RW conflicts runBenchmark(10, 10, 10); + // Very heavy conflicts runBenchmark(50, 50, 5); + std::cout << "===" << std::endl; } } // namespace } // namespace stellar From 74efd25316a85bbb63a2ced27f1cbd5336798007 Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 17:29:20 -0500 Subject: [PATCH 03/11] Add global conflict mask to Stage for fast-path getConflictingClusters Add mAllConflictTxs BitSet member to Stage that accumulates the union of all successfully-added transactions' conflict sets. getConflictingClusters checks mAllConflictTxs.get(tx.mId) first - if false, avoids the O(C) cluster scan entirely. Benchmark: worst case (conflicts=1,1000,1) improved ~49% (1148ms -> 585ms). Other scenarios within noise. --- src/herder/ParallelTxSetBuilder.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index 96901eed16..08bbb308eb 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -130,6 +130,9 @@ class Stage { mClusters = std::move(newClusters.value()); mInstructions += tx.mInstructions; + // Update the global conflict mask so future lookups can + // fast-path when a tx has no conflicts with any cluster. + mAllConflictTxs.inplaceUnion(tx.mConflictTxs); return true; } @@ -181,6 +184,9 @@ class Stage mBinPacking = std::move(newPacking.value()); mInstructions += tx.mInstructions; mBinInstructions = newBinInstructions; + // Update the global conflict mask so future lookups can + // fast-path when a tx has no conflicts with any cluster. + mAllConflictTxs.inplaceUnion(tx.mConflictTxs); return true; } @@ -206,6 +212,12 @@ class Stage getConflictingClusters(BuilderTx const& tx) const { std::unordered_set conflictingClusters; + // Fast path: if the tx's id is not in any cluster's conflict set, + // there are no conflicting clusters. + if (!mAllConflictTxs.get(tx.mId)) + { + return conflictingClusters; + } for (auto const& cluster : mClusters) { if (cluster->mConflictTxs.get(tx.mId)) @@ -374,6 +386,10 @@ class Stage uint64_t mInstructions = 0; ParallelPartitionConfig mConfig; bool mTriedCompactingBinPacking = false; + // Union of all clusters' mConflictTxs. Used as a fast-path check in + // getConflictingClusters to avoid scanning all clusters when the + // transaction has no conflicts with any existing cluster. + BitSet mAllConflictTxs; }; struct ParallelPhaseBuildResult From e9f1e50752a6279cc84ff8d6d5269815e325d3f7 Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 19:42:43 -0500 Subject: [PATCH 04/11] Replace O(C) cluster scan with O(K) tx-to-cluster lookup Add mTxToCluster vector mapping tx id -> cluster pointer, enabling getConflictingClusters to iterate tx.mConflictTxs set bits and look up which cluster each conflicting tx belongs to. This is O(K) where K is the number of conflict tx ids, vs the previous O(C) cluster scan. The mapping is updated after each successful tryAdd by walking the new merged cluster's txIds bitset. For the full bin-packing path we save the new cluster pointer before binPacking() sorts the vector. Benchmark across all scenarios shows 5-16% improvement beyond Step 2. Best improvement: conflicts=20,40,1 now at 480ms (was 675ms baseline, -29%). --- src/herder/ParallelTxSetBuilder.cpp | 40 +++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index 08bbb308eb..c8ea251de3 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -92,7 +92,9 @@ struct Cluster class Stage { public: - Stage(ParallelPartitionConfig cfg) : mConfig(cfg) + Stage(ParallelPartitionConfig cfg, size_t txCount) + : mConfig(cfg) + , mTxToCluster(txCount, nullptr) { mBinPacking.resize(mConfig.mClustersPerStage); mBinInstructions.resize(mConfig.mClustersPerStage); @@ -133,6 +135,7 @@ class Stage // Update the global conflict mask so future lookups can // fast-path when a tx has no conflicts with any cluster. mAllConflictTxs.inplaceUnion(tx.mConflictTxs); + updateTxToCluster(*mClusters.back()); return true; } @@ -171,6 +174,9 @@ class Stage } // Try to recompute the bin-packing from scratch with a more efficient // heuristic. + // Save a reference to the new merged cluster before binPacking sorts + // the vector (after sort, .back() may no longer be the new cluster). + auto newCluster = newClusters->back(); std::vector newBinInstructions; auto newPacking = binPacking(*newClusters, newBinInstructions); // Even if the new cluster is below the limit, it may invalidate the @@ -187,6 +193,7 @@ class Stage // Update the global conflict mask so future lookups can // fast-path when a tx has no conflicts with any cluster. mAllConflictTxs.inplaceUnion(tx.mConflictTxs); + updateTxToCluster(*newCluster); return true; } @@ -218,16 +225,34 @@ class Stage { return conflictingClusters; } - for (auto const& cluster : mClusters) + // O(K) lookup: iterate the conflict tx ids and find their + // clusters via the tx-to-cluster mapping, instead of scanning + // all clusters (which would be O(C)). + size_t conflictTxId = 0; + while (tx.mConflictTxs.nextSet(conflictTxId)) { - if (cluster->mConflictTxs.get(tx.mId)) + auto* cluster = mTxToCluster[conflictTxId]; + if (cluster != nullptr) { - conflictingClusters.insert(cluster.get()); + conflictingClusters.insert(cluster); } + ++conflictTxId; } return conflictingClusters; } + void + updateTxToCluster(Cluster const& cluster) + { + auto* clusterPtr = &cluster; + size_t txId = 0; + while (cluster.mTxIds.nextSet(txId)) + { + mTxToCluster[txId] = clusterPtr; + ++txId; + } + } + bool inPlaceBinPacking( Cluster const& newCluster, @@ -390,6 +415,10 @@ class Stage // getConflictingClusters to avoid scanning all clusters when the // transaction has no conflicts with any existing cluster. BitSet mAllConflictTxs; + // Maps tx id -> cluster pointer for O(K) conflict lookup. + // Sized to the total number of transactions; nullptr means the tx + // has not been added to this stage. + std::vector mTxToCluster; }; struct ParallelPhaseBuildResult @@ -411,7 +440,8 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( ZoneScoped; ParallelPartitionConfig partitionCfg(stageCount, sorobanCfg); - std::vector stages(partitionCfg.mStageCount, partitionCfg); + std::vector stages(partitionCfg.mStageCount, + Stage(partitionCfg, txFrames.size())); // Visit the transactions in the surge pricing queue and try to add them to // at least one of the stages. From 0da71869dc9611b38bb0f5c944d7e27037cecd0f Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 19:41:39 -0500 Subject: [PATCH 05/11] In-place mClusters mutation with rollback Replace createNewClusters (which copied all N shared_ptrs) with in-place mutation of mClusters. Conflicting clusters are saved for rollback, removed from the vector via compaction scan, and the new merged cluster is appended. On failure, rollback restores the original state. For the common no-conflict case this reduces shared_ptr operations from O(N) copies to a single push_back. Benchmark: 20-35% improvement across all scenarios vs Step 3. Combined from baseline: worst case -65% (1148ms -> 401ms), zero-conflict -32.5% (635ms -> 428ms). --- src/herder/ParallelTxSetBuilder.cpp | 118 ++++++++++++++++++---------- 1 file changed, 75 insertions(+), 43 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index c8ea251de3..9270c56f3b 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -115,27 +115,46 @@ class Stage // First, find all clusters that conflict with the new transaction. auto conflictingClusters = getConflictingClusters(tx); - // Then, try creating new clusters by merging the conflicting clusters - // together and adding the new transaction to the resulting cluster. - // Note, that the new cluster is guaranteed to be added at the end of - // `newClusters`. - auto newClusters = createNewClusters(tx, conflictingClusters); - // Fail fast if a new cluster will end up too large to fit into the - // stage and thus no new clusters could be created. - if (!newClusters) + // Check if the merged cluster would exceed the instruction limit. + uint64_t mergedInstructions = tx.mInstructions; + for (auto const* cluster : conflictingClusters) + { + mergedInstructions += cluster->mInstructions; + } + if (mergedInstructions > mConfig.mInstructionsPerCluster) { return false; } + + // Create the merged cluster from the new transaction and all + // conflicting clusters. + auto newCluster = std::make_shared(tx); + for (auto const* cluster : conflictingClusters) + { + newCluster->merge(*cluster); + } + + // Mutate mClusters in-place: remove conflicting clusters (saving + // them for potential rollback) and append the new merged cluster. + // This avoids the O(N) shared_ptr copy overhead of creating a + // new cluster vector on every tryAdd call. + std::vector> savedClusters; + if (!conflictingClusters.empty()) + { + savedClusters.reserve(conflictingClusters.size()); + removeConflictingClusters(conflictingClusters, savedClusters); + } + mClusters.push_back(newCluster); + // If it's possible to pack the newly-created cluster into one of the // bins 'in-place' without rebuilding the bin-packing, we do so. - if (inPlaceBinPacking(*newClusters->back(), conflictingClusters)) + if (inPlaceBinPacking(*newCluster, conflictingClusters)) { - mClusters = std::move(newClusters.value()); mInstructions += tx.mInstructions; // Update the global conflict mask so future lookups can // fast-path when a tx has no conflicts with any cluster. mAllConflictTxs.inplaceUnion(tx.mConflictTxs); - updateTxToCluster(*mClusters.back()); + updateTxToCluster(*newCluster); return true; } @@ -168,25 +187,23 @@ class Stage { if (mTriedCompactingBinPacking) { + rollbackClusters(newCluster.get(), savedClusters); return false; } mTriedCompactingBinPacking = true; } // Try to recompute the bin-packing from scratch with a more efficient - // heuristic. - // Save a reference to the new merged cluster before binPacking sorts - // the vector (after sort, .back() may no longer be the new cluster). - auto newCluster = newClusters->back(); + // heuristic. binPacking() sorts mClusters in-place. std::vector newBinInstructions; - auto newPacking = binPacking(*newClusters, newBinInstructions); + auto newPacking = binPacking(mClusters, newBinInstructions); // Even if the new cluster is below the limit, it may invalidate the // stage as a whole in case if we can no longer pack the clusters into // the required number of bins. if (!newPacking) { + rollbackClusters(newCluster.get(), savedClusters); return false; } - mClusters = std::move(newClusters.value()); mBinPacking = std::move(newPacking.value()); mInstructions += tx.mInstructions; mBinInstructions = newBinInstructions; @@ -286,40 +303,55 @@ class Stage return false; } - std::optional>> - createNewClusters( - BuilderTx const& tx, - std::unordered_set const& txConflicts) const + // Remove conflicting clusters from mClusters in-place, saving them + // in 'saved' for potential rollback. Uses a compaction scan: O(N) + // moves but no shared_ptr copies (which involve atomic refcounts). + void + removeConflictingClusters( + std::unordered_set const& toRemove, + std::vector>& saved) { - uint64_t newInstructions = tx.mInstructions; - for (auto const* cluster : txConflicts) - { - newInstructions += cluster->mInstructions; - } - - // Fast-fail condition to ensure that the new cluster doesn't exceed - // the instructions limit. - if (newInstructions > mConfig.mInstructionsPerCluster) - { - return std::nullopt; - } - auto newCluster = std::make_shared(tx); - for (auto const* cluster : txConflicts) + size_t writePos = 0; + for (size_t readPos = 0; readPos < mClusters.size(); ++readPos) { - newCluster->merge(*cluster); + if (toRemove.find(mClusters[readPos].get()) != toRemove.end()) + { + saved.push_back(std::move(mClusters[readPos])); + } + else + { + if (writePos != readPos) + { + mClusters[writePos] = std::move(mClusters[readPos]); + } + ++writePos; + } } + mClusters.resize(writePos); + } - std::vector> newClusters; - newClusters.reserve(mClusters.size() + 1 - txConflicts.size()); - for (auto const& cluster : mClusters) + // Rollback an in-place mutation: find and remove the merged cluster, + // then restore the saved conflicting clusters. + void + rollbackClusters( + Cluster const* mergedCluster, + std::vector>& savedClusters) + { + // Find and swap-pop the merged cluster. + for (size_t i = 0; i < mClusters.size(); ++i) { - if (txConflicts.find(cluster.get()) == txConflicts.end()) + if (mClusters[i].get() == mergedCluster) { - newClusters.push_back(cluster); + mClusters[i] = std::move(mClusters.back()); + mClusters.pop_back(); + break; } } - newClusters.push_back(newCluster); - return std::make_optional(std::move(newClusters)); + // Restore the saved conflicting clusters. + for (auto& saved : savedClusters) + { + mClusters.push_back(std::move(saved)); + } } // Simple bin-packing first-fit-decreasing heuristic From 39b69e494fa8d762b9540c8f09f314802712584e Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 19:43:11 -0500 Subject: [PATCH 06/11] Fix by-value captures - Change txFrames and sorobanCfg from by-value to by-reference capture in the thread lambda, eliminating O(N) shared_ptr copies per thread. Benchmark: modest improvement on conflict-heavy scenarios (conflicts=1,1000,1: 401->381ms, conflicts=10,10,10: 576->524ms). --- src/herder/ParallelTxSetBuilder.cpp | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index 9270c56f3b..7faa71f7e9 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -8,6 +8,8 @@ #include "transactions/TransactionFrameBase.h" #include "util/BitSet.h" +#include +#include #include namespace stellar @@ -235,16 +237,16 @@ class Stage std::unordered_set getConflictingClusters(BuilderTx const& tx) const { - std::unordered_set conflictingClusters; // Fast path: if the tx's id is not in any cluster's conflict set, // there are no conflicting clusters. if (!mAllConflictTxs.get(tx.mId)) { - return conflictingClusters; + return {}; } // O(K) lookup: iterate the conflict tx ids and find their // clusters via the tx-to-cluster mapping, instead of scanning // all clusters (which would be O(C)). + std::unordered_set conflictingClusters; size_t conflictTxId = 0; while (tx.mConflictTxs.nextSet(conflictTxId)) { @@ -493,7 +495,7 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( if (added) { return SurgePricingPriorityQueue::VisitTxResult::PROCESSED; - } + } // If a transaction didn't fit into any of the stages, we consider it // to have been excluded due to resource limits and thus notify the // surge pricing queue that surge pricing should be triggered ( @@ -617,21 +619,21 @@ buildSurgePricedParallelSorobanPhase( for (auto const& key : footprint.readWrite) { txsWithRwKey[key].push_back(i); - } } + } for (auto const& [key, rwTxIds] : txsWithRwKey) - { - // RW-RW conflicts - for (size_t i = 0; i < rwTxIds.size(); ++i) { - for (size_t j = i + 1; j < rwTxIds.size(); ++j) + // RW-RW conflicts + for (size_t i = 0; i < rwTxIds.size(); ++i) { + for (size_t j = i + 1; j < rwTxIds.size(); ++j) + { builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]); builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]); + } } - } - // RO-RW conflicts + // RO-RW conflicts auto roIt = txsWithRoKey.find(key); if (roIt != txsWithRoKey.end()) { @@ -643,9 +645,9 @@ buildSurgePricedParallelSorobanPhase( builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]); builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]); } + } } } - } // Process the transactions in the surge pricing (decreasing fee) order. // This also automatically ensures that the resource limits are respected @@ -668,8 +670,8 @@ buildSurgePricedParallelSorobanPhase( stageCount <= cfg.SOROBAN_PHASE_MAX_STAGE_COUNT; ++stageCount) { size_t resultIndex = stageCount - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT; - threads.emplace_back([queue, &builderTxForTx, txFrames, stageCount, - sorobanCfg, laneConfig, resultIndex, &results, + threads.emplace_back([queue, &builderTxForTx, &txFrames, stageCount, + &sorobanCfg, laneConfig, resultIndex, &results, ledgerVersion]() { results.at(resultIndex) = buildSurgePricedParallelSorobanPhaseWithStageCount( From 145733cef4670cb753fa21aae96a184559bf248c Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 18:33:21 -0500 Subject: [PATCH 07/11] Replace hash-map conflict detection with sort-based approach Replace UnorderedMap footprint maps with a flat vector of (keyHash, txId, isRW) entries sorted by precomputed hash. This avoids ~400K hash map node allocations and multiple rehashings, providing much better cache behavior through sequential memory access. The sort-based approach: 1. Collects all footprint entries into a single flat vector with precomputed LedgerKey hashes (one allocation). 2. Sorts by hash for cache-friendly grouping. 3. Linear scan finds groups sharing the same key hash. 4. Conflict marking (RW-RW and RO-RW) is done in the same scan, eliminating the separate conflict marking phase. Profile improvement (no-conflict scenario): footprintMaps: 240ms -> 70ms (hashSort) conflictMark: 27ms -> 1ms (conflictScan) Combined: 267ms -> 71ms (3.8x faster) Benchmark improvement from previous step: conflicts=0,0,0: 452.6 -> 179.4ms (-60%) conflicts=0.1,5,1: 424.0 -> 196.3ms (-54%) conflicts=1,1000,1: 381.4 -> 170.5ms (-55%) conflicts=10,10,10: 524.4 -> 294.1ms (-44%) --- src/herder/ParallelTxSetBuilder.cpp | 133 +++++++++++++++++++++------- 1 file changed, 99 insertions(+), 34 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index 7faa71f7e9..ec2b8fc40a 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -533,11 +533,11 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( it = clusterIdToStageCluster .emplace(clusterId, resStage.size()) .first; - resStage.emplace_back(); - } - totalInclusionFee += txFrames[txId]->getInclusionFee(); + resStage.emplace_back(); + } + totalInclusionFee += txFrames[txId]->getInclusionFee(); resStage[it->second].push_back(txFrames[txId]); - }); + }); // Algorithm ensures that clusters are populated from first to last and // no empty clusters are generated. for (auto const& cluster : resStage) @@ -592,63 +592,128 @@ buildSurgePricedParallelSorobanPhase( // Before trying to include any transactions, find all the pairs of the // conflicting transactions and mark the conflicts in the builderTxs. // - // In order to find the conflicts, we build the maps from the footprint - // keys to transactions, then mark the conflicts between the transactions - // that share RW key, or between the transactions that share RO and RW key. + // We use a sort-based approach: collect all footprint entries into a flat + // vector tagged with (key hash, tx id, RO/RW), sort by hash, + // then scan for groups sharing the same key hash. This is significantly + // faster in practice than using hash map lookups. // - // The approach here is optimized towards the low number of conflicts, - // specifically when there are no conflicts at all, the complexity is just - // O(total_footprint_entry_count). The worst case is roughly - // O(max_tx_footprint_size * transaction_count ^ 2), which is equivalent - // to the complexity of the straightforward approach of iterating over all - // the transaction pairs. + // With 64-bit hashes and typical + // footprint sizes, collisions are exceedingly rare . // // This also has the further optimization potential: we could populate the // key maps and even the conflicting transactions eagerly in tx queue, thus // amortizing the costs across the whole ledger duration. - UnorderedMap> txsWithRoKey; - UnorderedMap> txsWithRwKey; + struct FpEntry + { + size_t keyHash; + uint32_t txId; + bool isRW; + }; + + // Count total footprint entries for a single allocation. + size_t totalFpEntries = 0; + for (auto const& txFrame : txFrames) + { + auto const& fp = txFrame->sorobanResources().footprint; + totalFpEntries += fp.readOnly.size() + fp.readWrite.size(); + } + + std::vector fpEntries; + fpEntries.reserve(totalFpEntries); + std::hash keyHasher; for (size_t i = 0; i < txFrames.size(); ++i) { - auto const& txFrame = txFrames[i]; - auto const& footprint = txFrame->sorobanResources().footprint; + auto const& footprint = txFrames[i]->sorobanResources().footprint; for (auto const& key : footprint.readOnly) { - txsWithRoKey[key].push_back(i); + fpEntries.push_back( + {keyHasher(key), static_cast(i), false}); } for (auto const& key : footprint.readWrite) { - txsWithRwKey[key].push_back(i); + fpEntries.push_back( + {keyHasher(key), static_cast(i), true}); + } } + + // Sort by hash for cache-friendly grouping. + std::sort(fpEntries.begin(), fpEntries.end(), + [](FpEntry const& a, FpEntry const& b) { + return a.keyHash < b.keyHash; + }); + + // Scan sorted entries for groups sharing the same hash, then mark + // conflicts between transactions that share RW keys (RW-RW and RO-RW). + // Conservatively treat hash collisions as potential conflicts - collisions + // should generally be rare and allocating collisions to the same thread + // is guaranteed to be safe (while disambiguating the conflicts would be + // expensive and complex). Collision probability is really low (K^2/2^64). + for (size_t groupStart = 0; groupStart < fpEntries.size();) + { + size_t groupEnd = groupStart + 1; + while (groupEnd < fpEntries.size() && + fpEntries[groupEnd].keyHash == fpEntries[groupStart].keyHash) + { + ++groupEnd; } - for (auto const& [key, rwTxIds] : txsWithRwKey) + // Skip singleton groups — no possible conflicts. + if (groupEnd - groupStart < 2) { - // RW-RW conflicts - for (size_t i = 0; i < rwTxIds.size(); ++i) + groupStart = groupEnd; + continue; + } + + // Collect all entries matching. + std::vector roTxs; + std::vector rwTxs; + for (size_t i = groupStart; i < groupEnd; ++i) + { + if (fpEntries[i].isRW) + { + rwTxs.push_back(fpEntries[i].txId); + } + else + { + roTxs.push_back(fpEntries[i].txId); + } + } + // RW-RW conflicts + for (size_t i = 0; i < rwTxs.size(); ++i) + { + for (size_t j = i + 1; j < rwTxs.size(); ++j) { - for (size_t j = i + 1; j < rwTxIds.size(); ++j) + // In a rare case of hash collision within a transaction, we + // might have the same transaction appear several times in the + // same group. + if (rwTxs[i] == rwTxs[j]) { - builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]); - builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]); + continue; } + builderTxs[rwTxs[i]].mConflictTxs.set(rwTxs[j]); + builderTxs[rwTxs[j]].mConflictTxs.set(rwTxs[i]); } - // RO-RW conflicts - auto roIt = txsWithRoKey.find(key); - if (roIt != txsWithRoKey.end()) + } + // RO-RW conflicts + for (size_t i = 0; i < roTxs.size(); ++i) { - auto const& roTxIds = roIt->second; - for (size_t i = 0; i < roTxIds.size(); ++i) + for (size_t j = 0; j < rwTxs.size(); ++j) { - for (size_t j = 0; j < rwTxIds.size(); ++j) + // In a rare case of hash collision within a transaction, we + // might have the same transaction appear several times in the + // same group. + if (roTxs[i] == rwTxs[j]) { - builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]); - builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]); - } + continue; } + builderTxs[roTxs[i]].mConflictTxs.set(rwTxs[j]); + builderTxs[rwTxs[j]].mConflictTxs.set(roTxs[i]); } } + groupStart = groupEnd; + } + // Process the transactions in the surge pricing (decreasing fee) order. // This also automatically ensures that the resource limits are respected // for all the dimensions besides instructions. From fdcc54a914deee3d9fcaecd31dbbefab9bffee8d Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Fri, 13 Feb 2026 18:56:08 -0500 Subject: [PATCH 08/11] Replace SurgePricingPriorityQueue with pre-sorted vector. Worker threads receive read-only references instead of expensive deep-copied std::set trees. Each thread does a simple linear scan over the sorted order, checking anyGreater() to skip oversized txs. It's not great that a bit of logic is duplicated with SurgePricingPriorityQueue now, but on the other hand the parallel tx set nomination is different enough from the mempool prioritization and going forward it might diverge even more, so this duplication doesn't seem too problematic to me (and it's just a few lines in the end). Benchmark (10000 txs, 128 clusters, 1-4 stages, 5 iterations): conflicts=0,0,0: 179.4 -> 194.8ms (noise; +8.6%) conflicts=0.1,5,1: 196.3 -> 150.0ms (-23.6%) conflicts=0.5,2,2: 192.9 -> 142.2ms (-26.3%) conflicts=1,1000,1: 170.5 -> 139.0ms (-18.5%) conflicts=5,0,3: 176.8 -> 156.4ms (-11.5%) conflicts=10,40,1: 231.3 -> 194.0ms (-16.1%) conflicts=20,40,1: 253.8 -> 195.3ms (-23.0%) conflicts=10,10,10: 294.1 -> 250.9ms (-14.7%) conflicts=50,50,5: 265.0 -> 199.6ms (-24.7%) 8/9 scenarios improved, up to 26% faster. Eliminates O(N log N) std::set insertions and 4 expensive tree deep-copies per build. --- src/herder/ParallelTxSetBuilder.cpp | 136 +++++++++++++++++----------- src/herder/SurgePricingUtils.cpp | 25 ++--- src/herder/SurgePricingUtils.h | 58 ++++++------ src/herder/TxSetFrame.cpp | 17 ++-- 4 files changed, 137 insertions(+), 99 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index ec2b8fc40a..aa8df37cdd 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -464,12 +464,12 @@ struct ParallelPhaseBuildResult ParallelPhaseBuildResult buildSurgePricedParallelSorobanPhaseWithStageCount( - SurgePricingPriorityQueue queue, + std::vector const& sortedTxOrder, + std::vector const& txResources, Resource const& laneLimit, std::unordered_map const& builderTxForTx, TxFrameList const& txFrames, uint32_t stageCount, - SorobanNetworkConfig const& sorobanCfg, - std::shared_ptr laneConfig, uint32_t ledgerVersion) + SorobanNetworkConfig const& sorobanCfg) { ZoneScoped; ParallelPartitionConfig partitionCfg(stageCount, sorobanCfg); @@ -477,13 +477,32 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( std::vector stages(partitionCfg.mStageCount, Stage(partitionCfg, txFrames.size())); - // Visit the transactions in the surge pricing queue and try to add them to - // at least one of the stages. - auto visitor = [&stages, - &builderTxForTx](TransactionFrameBaseConstPtr const& tx) { - bool added = false; - auto builderTxIt = builderTxForTx.find(tx); + // Iterate transactions in decreasing fee order and try greedily pack them + // into one of the stages until the limits are reached. Transactions that + // don't fit into any of the stages are skipped and surge pricing will be + // triggered for the transaction set. + Resource laneLeft = laneLimit; + bool hadTxNotFittingLane = false; + + for (size_t txIdx : sortedTxOrder) + { + auto const& txRes = txResources[txIdx]; + + // Check if the transaction fits within the remaining lane resource + // limits. This mirrors the anyGreater check in popTopTxs that skips + // transactions exceeding resource limits. + if (anyGreater(txRes, laneLeft)) + { + hadTxNotFittingLane = true; + continue; + } + + // Try to add the transaction to one of the stages. + auto const& txFrame = txFrames[txIdx]; + auto builderTxIt = builderTxForTx.find(txFrame); releaseAssert(builderTxIt != builderTxForTx.end()); + + bool added = false; for (auto& stage : stages) { if (stage.tryAdd(*builderTxIt->second)) @@ -492,26 +511,23 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( break; } } + if (added) { - return SurgePricingPriorityQueue::VisitTxResult::PROCESSED; + // Transaction included in the stage, update the remaining lane + // resources. + laneLeft -= txRes; + } + else + { + // Transaction didn't fit into any of the stages, mark that lane + // limits were exceeded to trigger surge pricing. + hadTxNotFittingLane = true; + } } - // If a transaction didn't fit into any of the stages, we consider it - // to have been excluded due to resource limits and thus notify the - // surge pricing queue that surge pricing should be triggered ( - // REJECTED imitates the behavior for exceeding the resource limit - // within the queue itself). - return SurgePricingPriorityQueue::VisitTxResult::REJECTED; - }; ParallelPhaseBuildResult result; - std::vector laneLeftUntilLimitUnused; - queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused, - result.mHadTxNotFittingLane, ledgerVersion); - // There is only a single fee lane for Soroban, so there is only a single - // flag that indicates whether there was a transaction that didn't fit into - // lane (and thus all transactions are surge priced at once). - releaseAssert(result.mHadTxNotFittingLane.size() == 1); + result.mHadTxNotFittingLane = {hadTxNotFittingLane}; // At this point the stages have been filled with transactions and we just // need to place the full transactions into the respective stages/clusters. @@ -533,11 +549,11 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( it = clusterIdToStageCluster .emplace(clusterId, resStage.size()) .first; - resStage.emplace_back(); - } - totalInclusionFee += txFrames[txId]->getInclusionFee(); + resStage.emplace_back(); + } + totalInclusionFee += txFrames[txId]->getInclusionFee(); resStage[it->second].push_back(txFrames[txId]); - }); + }); // Algorithm ensures that clusters are populated from first to last and // no empty clusters are generated. for (auto const& cluster : resStage) @@ -670,19 +686,19 @@ buildSurgePricedParallelSorobanPhase( for (size_t i = groupStart; i < groupEnd; ++i) { if (fpEntries[i].isRW) - { + { rwTxs.push_back(fpEntries[i].txId); - } - else - { + } + else + { roTxs.push_back(fpEntries[i].txId); } - } - // RW-RW conflicts + } + // RW-RW conflicts for (size_t i = 0; i < rwTxs.size(); ++i) - { - for (size_t j = i + 1; j < rwTxs.size(); ++j) { + for (size_t j = i + 1; j < rwTxs.size(); ++j) + { // In a rare case of hash collision within a transaction, we // might have the same transaction appear several times in the // same group. @@ -692,13 +708,13 @@ buildSurgePricedParallelSorobanPhase( } builderTxs[rwTxs[i]].mConflictTxs.set(rwTxs[j]); builderTxs[rwTxs[j]].mConflictTxs.set(rwTxs[i]); + } } - } - // RO-RW conflicts + // RO-RW conflicts for (size_t i = 0; i < roTxs.size(); ++i) - { - for (size_t j = 0; j < rwTxs.size(); ++j) { + for (size_t j = 0; j < rwTxs.size(); ++j) + { // In a rare case of hash collision within a transaction, we // might have the same transaction appear several times in the // same group. @@ -714,18 +730,34 @@ buildSurgePricedParallelSorobanPhase( groupStart = groupEnd; } - // Process the transactions in the surge pricing (decreasing fee) order. - // This also automatically ensures that the resource limits are respected - // for all the dimensions besides instructions. - SurgePricingPriorityQueue queue( - /* isHighestPriority */ true, laneConfig, + // Sort transactions in decreasing inclusion fee order. + TxFeeComparator txComparator( + /* isGreater */ true, stellar::rand_uniform(0, std::numeric_limits::max())); + std::vector sortedTxOrder(txFrames.size()); + std::iota(sortedTxOrder.begin(), sortedTxOrder.end(), 0); + std::sort(sortedTxOrder.begin(), sortedTxOrder.end(), + [&txFrames, &txComparator](size_t a, size_t b) { + return txComparator(txFrames[a], txFrames[b]); + }); + + // Precompute per-transaction resources to avoid repeated virtual calls + // and heap allocations across threads. + std::vector txResources; + txResources.reserve(txFrames.size()); for (auto const& tx : txFrames) { - queue.add(tx, ledgerVersion); + txResources.push_back( + tx->getResources(/* useByteLimitInClassic */ false, ledgerVersion)); } - // Create a worker thread for each stage count. + // Get the lane limit. Soroban uses a single generic lane. + auto const& laneLimits = laneConfig->getLaneLimits(); + releaseAssert(laneLimits.size() == 1); + auto const& laneLimit = laneLimits[0]; + + // Create a worker thread for each stage count. The sorted order and + // precomputed resources are shared across all threads (read-only). std::vector threads; uint32_t stageCountOptions = cfg.SOROBAN_PHASE_MAX_STAGE_COUNT - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT + 1; @@ -735,13 +767,13 @@ buildSurgePricedParallelSorobanPhase( stageCount <= cfg.SOROBAN_PHASE_MAX_STAGE_COUNT; ++stageCount) { size_t resultIndex = stageCount - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT; - threads.emplace_back([queue, &builderTxForTx, &txFrames, stageCount, - &sorobanCfg, laneConfig, resultIndex, &results, - ledgerVersion]() { + threads.emplace_back([&sortedTxOrder, &txResources, &laneLimit, + &builderTxForTx, &txFrames, stageCount, + &sorobanCfg, resultIndex, &results]() { results.at(resultIndex) = buildSurgePricedParallelSorobanPhaseWithStageCount( - std::move(queue), builderTxForTx, txFrames, stageCount, - sorobanCfg, laneConfig, ledgerVersion); + sortedTxOrder, txResources, laneLimit, builderTxForTx, + txFrames, stageCount, sorobanCfg); }); } for (auto& thread : threads) diff --git a/src/herder/SurgePricingUtils.cpp b/src/herder/SurgePricingUtils.cpp index 40514992dd..744ba40320 100644 --- a/src/herder/SurgePricingUtils.cpp +++ b/src/herder/SurgePricingUtils.cpp @@ -70,8 +70,7 @@ computeBetterFee(TransactionFrameBase const& tx, int64_t refFeeBid, return minFee; } -SurgePricingPriorityQueue::TxComparator::TxComparator(bool isGreater, - size_t _seed) +TxFeeComparator::TxFeeComparator(bool isGreater, size_t _seed) : mIsGreater(isGreater) #ifndef BUILD_TESTS , mSeed(_seed) @@ -80,41 +79,37 @@ SurgePricingPriorityQueue::TxComparator::TxComparator(bool isGreater, } bool -SurgePricingPriorityQueue::TxComparator::operator()( - TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const +TxFeeComparator::operator()(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const { return txLessThan(tx1, tx2) ^ mIsGreater; } bool -SurgePricingPriorityQueue::TxComparator::compareFeeOnly( - TransactionFrameBase const& tx1, TransactionFrameBase const& tx2) const +TxFeeComparator::compareFeeOnly(TransactionFrameBase const& tx1, + TransactionFrameBase const& tx2) const { return compareFeeOnly(tx1.getInclusionFee(), tx1.getNumOperations(), tx2.getInclusionFee(), tx2.getNumOperations()); } bool -SurgePricingPriorityQueue::TxComparator::compareFeeOnly(int64_t tx1Bid, - uint32_t tx1Ops, - int64_t tx2Bid, - uint32_t tx2Ops) const +TxFeeComparator::compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, + uint32_t tx2Ops) const { bool isLess = feeRate3WayCompare(tx1Bid, tx1Ops, tx2Bid, tx2Ops) < 0; return isLess ^ mIsGreater; } bool -SurgePricingPriorityQueue::TxComparator::isGreater() const +TxFeeComparator::isGreater() const { return mIsGreater; } bool -SurgePricingPriorityQueue::TxComparator::txLessThan( - TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const +TxFeeComparator::txLessThan(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const { auto cmp3 = feeRate3WayCompare(*tx1, *tx2); diff --git a/src/herder/SurgePricingUtils.h b/src/herder/SurgePricingUtils.h index 19927bc857..0c353dbf07 100644 --- a/src/herder/SurgePricingUtils.h +++ b/src/herder/SurgePricingUtils.h @@ -21,6 +21,36 @@ int feeRate3WayCompare(int64_t lFeeBid, uint32_t lNbOps, int64_t rFeeBid, int64_t computeBetterFee(TransactionFrameBase const& tx, int64_t refFeeBid, uint32_t refNbOps); +// Transaction comparator for transaction prioritization and surge pricing +// based on transaction inclusion fees. +// operator() is suitable for sorting the transactions while shuffling +// transactions with the same fee rate in random order. +// compareFeeOnly can be used to just determine whether one transaction has a +// strictly higher fee rate than the other. +class TxFeeComparator +{ + public: + TxFeeComparator(bool isGreater, size_t seed); + + bool operator()(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const; + + bool compareFeeOnly(TransactionFrameBase const& tx1, + TransactionFrameBase const& tx2) const; + bool compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, + uint32_t tx2Ops) const; + bool isGreater() const; + + private: + bool txLessThan(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const; + + bool const mIsGreater; +#ifndef BUILD_TESTS + size_t mSeed; +#endif +}; + // Configuration for multi-lane transaction limiting and surge pricing. // // This configuration defines how many 'lanes' are there to compare and limit @@ -225,31 +255,7 @@ class SurgePricingPriorityQueue std::nullopt); private: - class TxComparator - { - public: - TxComparator(bool isGreater, size_t seed); - - bool operator()(TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const; - - bool compareFeeOnly(TransactionFrameBase const& tx1, - TransactionFrameBase const& tx2) const; - bool compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, - uint32_t tx2Ops) const; - bool isGreater() const; - - private: - bool txLessThan(TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const; - - bool const mIsGreater; -#ifndef BUILD_TESTS - size_t mSeed; -#endif - }; - - using TxSortedSet = std::set; + using TxSortedSet = std::set; using LaneIter = std::pair; // Iterator for walking the queue from top to bottom, possibly restricted @@ -285,7 +291,7 @@ class SurgePricingPriorityQueue Iterator getTop() const; - TxComparator const mComparator; + TxFeeComparator const mComparator; std::shared_ptr mLaneConfig; std::vector const& mLaneLimits; diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index 602f0cf181..952345e607 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -940,15 +940,20 @@ makeTxSetFromTransactions( static_cast(i)); } } - + if (!valid) + { + throw std::runtime_error("Created invalid tx set frame - shape is " + "mismatched after roundtrip."); + } // We already trimmed invalid transactions in an earlier call to // `trimInvalid`, so skip transaction validation here - valid = valid && outputApplicableTxSet->checkValidInternal( - app, lowerBoundCloseTimeOffset, - upperBoundCloseTimeOffset, true); - if (!valid) + auto validationResult = outputApplicableTxSet->checkValidInternalWithResult( + app, lowerBoundCloseTimeOffset, upperBoundCloseTimeOffset, true); + if (validationResult != TxSetValidationResult::VALID) { - throw std::runtime_error("Created invalid tx set frame"); + throw std::runtime_error(fmt::format( + FMT_STRING("Created invalid tx set frame, validation result: {:s}"), + toString(validationResult))); } return std::make_pair(outputTxSet, std::move(outputApplicableTxSet)); From e7de538acc29a2b43f5946d03909c73ba1dc2725 Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Tue, 17 Feb 2026 14:20:27 -0500 Subject: [PATCH 09/11] Misc optimizations: - get rid of shared_ptr - get rid of unnecessary maps Up to 40% improvement on some benchmarks (180->110ms for no conflicts scenario). --- src/herder/ParallelTxSetBuilder.cpp | 134 +++++++++++++--------------- src/herder/test/TxSetTests.cpp | 3 +- 2 files changed, 66 insertions(+), 71 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index aa8df37cdd..a8377e56cf 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -94,9 +94,14 @@ struct Cluster class Stage { public: + Stage(const Stage&) = delete; + Stage& operator=(const Stage&) = delete; + + Stage(Stage&&) = default; + Stage& operator=(Stage&&) = default; + Stage(ParallelPartitionConfig cfg, size_t txCount) - : mConfig(cfg) - , mTxToCluster(txCount, nullptr) + : mConfig(cfg), mTxToCluster(txCount, nullptr) { mBinPacking.resize(mConfig.mClustersPerStage); mBinInstructions.resize(mConfig.mClustersPerStage); @@ -130,7 +135,7 @@ class Stage // Create the merged cluster from the new transaction and all // conflicting clusters. - auto newCluster = std::make_shared(tx); + auto newCluster = std::make_unique(tx); for (auto const* cluster : conflictingClusters) { newCluster->merge(*cluster); @@ -138,25 +143,24 @@ class Stage // Mutate mClusters in-place: remove conflicting clusters (saving // them for potential rollback) and append the new merged cluster. - // This avoids the O(N) shared_ptr copy overhead of creating a - // new cluster vector on every tryAdd call. - std::vector> savedClusters; + std::vector> savedClusters; if (!conflictingClusters.empty()) { savedClusters.reserve(conflictingClusters.size()); removeConflictingClusters(conflictingClusters, savedClusters); } - mClusters.push_back(newCluster); + mClusters.push_back(std::move(newCluster)); // If it's possible to pack the newly-created cluster into one of the // bins 'in-place' without rebuilding the bin-packing, we do so. - if (inPlaceBinPacking(*newCluster, conflictingClusters)) + auto* addedCluster = mClusters.back().get(); + if (inPlaceBinPacking(*addedCluster, conflictingClusters)) { mInstructions += tx.mInstructions; // Update the global conflict mask so future lookups can // fast-path when a tx has no conflicts with any cluster. mAllConflictTxs.inplaceUnion(tx.mConflictTxs); - updateTxToCluster(*newCluster); + updateTxToCluster(*addedCluster); return true; } @@ -189,7 +193,7 @@ class Stage { if (mTriedCompactingBinPacking) { - rollbackClusters(newCluster.get(), savedClusters); + rollbackClusters(addedCluster, savedClusters); return false; } mTriedCompactingBinPacking = true; @@ -203,7 +207,7 @@ class Stage // the required number of bins. if (!newPacking) { - rollbackClusters(newCluster.get(), savedClusters); + rollbackClusters(addedCluster, savedClusters); return false; } mBinPacking = std::move(newPacking.value()); @@ -212,7 +216,7 @@ class Stage // Update the global conflict mask so future lookups can // fast-path when a tx has no conflicts with any cluster. mAllConflictTxs.inplaceUnion(tx.mConflictTxs); - updateTxToCluster(*newCluster); + updateTxToCluster(*addedCluster); return true; } @@ -250,7 +254,7 @@ class Stage size_t conflictTxId = 0; while (tx.mConflictTxs.nextSet(conflictTxId)) { - auto* cluster = mTxToCluster[conflictTxId]; + auto const* cluster = mTxToCluster[conflictTxId]; if (cluster != nullptr) { conflictingClusters.insert(cluster); @@ -306,12 +310,11 @@ class Stage } // Remove conflicting clusters from mClusters in-place, saving them - // in 'saved' for potential rollback. Uses a compaction scan: O(N) - // moves but no shared_ptr copies (which involve atomic refcounts). + // in 'saved' for potential rollback. void removeConflictingClusters( std::unordered_set const& toRemove, - std::vector>& saved) + std::vector>& saved) { size_t writePos = 0; for (size_t readPos = 0; readPos < mClusters.size(); ++readPos) @@ -335,9 +338,8 @@ class Stage // Rollback an in-place mutation: find and remove the merged cluster, // then restore the saved conflicting clusters. void - rollbackClusters( - Cluster const* mergedCluster, - std::vector>& savedClusters) + rollbackClusters(Cluster const* mergedCluster, + std::vector>& savedClusters) { // Find and swap-pop the merged cluster. for (size_t i = 0; i < mClusters.size(); ++i) @@ -361,7 +363,7 @@ class Stage // This has around 11/9 maximum approximation ratio, which probably has // the best complexity/performance tradeoff out of all the heuristics. std::optional> - binPacking(std::vector>& clusters, + binPacking(std::vector>& clusters, std::vector& binInsns) const { // We could consider dropping the sort here in order to save some time @@ -415,7 +417,7 @@ class Stage // Looked at another way: two clusters that _aren't_ merged by the end of // the process of forming clusters _are_ data-independent and _could_ // potentially run in parallel. - std::vector> mClusters; + std::vector> mClusters; // The clusters formed by data dependency merging may, however, // significantly outnumber the maximum _allowed_ amount of parallelism in // the stage -- a number called `ledgerMaxDependentTxClusters` in CAP-0063 @@ -466,16 +468,18 @@ ParallelPhaseBuildResult buildSurgePricedParallelSorobanPhaseWithStageCount( std::vector const& sortedTxOrder, std::vector const& txResources, Resource const& laneLimit, - std::unordered_map const& - builderTxForTx, - TxFrameList const& txFrames, uint32_t stageCount, - SorobanNetworkConfig const& sorobanCfg) + std::vector const& builderTxs, TxFrameList const& txFrames, + uint32_t stageCount, SorobanNetworkConfig const& sorobanCfg) { ZoneScoped; ParallelPartitionConfig partitionCfg(stageCount, sorobanCfg); - std::vector stages(partitionCfg.mStageCount, - Stage(partitionCfg, txFrames.size())); + std::vector stages; + stages.reserve(partitionCfg.mStageCount); + for (uint32_t i = 0; i < partitionCfg.mStageCount; ++i) + { + stages.emplace_back(partitionCfg, txFrames.size()); + } // Iterate transactions in decreasing fee order and try greedily pack them // into one of the stages until the limits are reached. Transactions that @@ -498,14 +502,10 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( } // Try to add the transaction to one of the stages. - auto const& txFrame = txFrames[txIdx]; - auto builderTxIt = builderTxForTx.find(txFrame); - releaseAssert(builderTxIt != builderTxForTx.end()); - bool added = false; for (auto& stage : stages) { - if (stage.tryAdd(*builderTxIt->second)) + if (stage.tryAdd(builderTxs[txIdx])) { added = true; break; @@ -538,22 +538,20 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( auto& resStage = result.mStages.emplace_back(); resStage.reserve(partitionCfg.mClustersPerStage); - std::unordered_map clusterIdToStageCluster; + std::vector binToStageCluster( + partitionCfg.mClustersPerStage, std::numeric_limits::max()); - stage.visitAllTransactions( - [&resStage, &txFrames, &clusterIdToStageCluster, - &totalInclusionFee](size_t clusterId, size_t txId) { - auto it = clusterIdToStageCluster.find(clusterId); - if (it == clusterIdToStageCluster.end()) - { - it = clusterIdToStageCluster - .emplace(clusterId, resStage.size()) - .first; - resStage.emplace_back(); - } - totalInclusionFee += txFrames[txId]->getInclusionFee(); - resStage[it->second].push_back(txFrames[txId]); - }); + stage.visitAllTransactions([&resStage, &txFrames, &binToStageCluster, + &totalInclusionFee](size_t binId, + size_t txId) { + if (binToStageCluster[binId] == std::numeric_limits::max()) + { + binToStageCluster[binId] = resStage.size(); + resStage.emplace_back(); + } + totalInclusionFee += txFrames[txId]->getInclusionFee(); + resStage[binToStageCluster[binId]].push_back(txFrames[txId]); + }); // Algorithm ensures that clusters are populated from first to last and // no empty clusters are generated. for (auto const& cluster : resStage) @@ -594,15 +592,11 @@ buildSurgePricedParallelSorobanPhase( double const MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT = 0.999; // Simplify the transactions to the minimum necessary amount of data. - std::unordered_map - builderTxForTx; - std::vector> builderTxs; + std::vector builderTxs; builderTxs.reserve(txFrames.size()); for (size_t i = 0; i < txFrames.size(); ++i) { - auto const& txFrame = txFrames[i]; - builderTxs.emplace_back(std::make_unique(i, *txFrame)); - builderTxForTx.emplace(txFrame, builderTxs.back().get()); + builderTxs.emplace_back(i, *txFrames[i]); } // Before trying to include any transactions, find all the pairs of the @@ -686,19 +680,19 @@ buildSurgePricedParallelSorobanPhase( for (size_t i = groupStart; i < groupEnd; ++i) { if (fpEntries[i].isRW) - { + { rwTxs.push_back(fpEntries[i].txId); - } - else - { - roTxs.push_back(fpEntries[i].txId); } + else + { + roTxs.push_back(fpEntries[i].txId); } - // RW-RW conflicts + } + // RW-RW conflicts for (size_t i = 0; i < rwTxs.size(); ++i) - { + { for (size_t j = i + 1; j < rwTxs.size(); ++j) - { + { // In a rare case of hash collision within a transaction, we // might have the same transaction appear several times in the // same group. @@ -708,13 +702,13 @@ buildSurgePricedParallelSorobanPhase( } builderTxs[rwTxs[i]].mConflictTxs.set(rwTxs[j]); builderTxs[rwTxs[j]].mConflictTxs.set(rwTxs[i]); - } } - // RO-RW conflicts + } + // RO-RW conflicts for (size_t i = 0; i < roTxs.size(); ++i) - { + { for (size_t j = 0; j < rwTxs.size(); ++j) - { + { // In a rare case of hash collision within a transaction, we // might have the same transaction appear several times in the // same group. @@ -739,7 +733,7 @@ buildSurgePricedParallelSorobanPhase( std::sort(sortedTxOrder.begin(), sortedTxOrder.end(), [&txFrames, &txComparator](size_t a, size_t b) { return txComparator(txFrames[a], txFrames[b]); - }); + }); // Precompute per-transaction resources to avoid repeated virtual calls // and heap allocations across threads. @@ -768,12 +762,12 @@ buildSurgePricedParallelSorobanPhase( { size_t resultIndex = stageCount - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT; threads.emplace_back([&sortedTxOrder, &txResources, &laneLimit, - &builderTxForTx, &txFrames, stageCount, - &sorobanCfg, resultIndex, &results]() { + &builderTxs, &txFrames, stageCount, &sorobanCfg, + resultIndex, &results]() { results.at(resultIndex) = buildSurgePricedParallelSorobanPhaseWithStageCount( - sortedTxOrder, txResources, laneLimit, builderTxForTx, - txFrames, stageCount, sorobanCfg); + sortedTxOrder, txResources, laneLimit, builderTxs, txFrames, + stageCount, sorobanCfg); }); } for (auto& thread : threads) diff --git a/src/herder/test/TxSetTests.cpp b/src/herder/test/TxSetTests.cpp index 888bae8e00..1b8d32bffa 100644 --- a/src/herder/test/TxSetTests.cpp +++ b/src/herder/test/TxSetTests.cpp @@ -3322,7 +3322,8 @@ TEST_CASE("parallel tx set building benchmark", << " ms" << std::endl; }; std::cout << "=== Parallel Tx Set Building Benchmark ===" << std::endl; - std::cout << "TX_COUNT=" << MEAN_INCLUDED_TX_COUNT * TX_COUNT_MEMPOOL_MULTIPLIER + std::cout << "TX_COUNT=" + << MEAN_INCLUDED_TX_COUNT * TX_COUNT_MEMPOOL_MULTIPLIER << " CLUSTER_COUNT=" << CLUSTER_COUNT << " STAGES=" << MIN_STAGE_COUNT << "-" << MAX_STAGE_COUNT << std::endl; From dbf437b90671d59bb49ac78cc5e19ed57faf77df Mon Sep 17 00:00:00 2001 From: Dmytro Kozhevin Date: Wed, 18 Feb 2026 19:13:24 -0500 Subject: [PATCH 10/11] !fixup comment update --- src/herder/ParallelTxSetBuilder.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index a8377e56cf..103d01bac1 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -607,9 +607,6 @@ buildSurgePricedParallelSorobanPhase( // then scan for groups sharing the same key hash. This is significantly // faster in practice than using hash map lookups. // - // With 64-bit hashes and typical - // footprint sizes, collisions are exceedingly rare . - // // This also has the further optimization potential: we could populate the // key maps and even the conflicting transactions eagerly in tx queue, thus // amortizing the costs across the whole ledger duration. @@ -674,7 +671,7 @@ buildSurgePricedParallelSorobanPhase( continue; } - // Collect all entries matching. + // Collect all entries with the matching key hash. std::vector roTxs; std::vector rwTxs; for (size_t i = groupStart; i < groupEnd; ++i) From 15006d738edb9dd0a591c17dcad33793232665bf Mon Sep 17 00:00:00 2001 From: Dmytro Date: Wed, 18 Feb 2026 19:14:17 -0500 Subject: [PATCH 11/11] !fixup fix comparator for sort --- src/herder/SurgePricingUtils.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/herder/SurgePricingUtils.cpp b/src/herder/SurgePricingUtils.cpp index 744ba40320..fe9d0c17b8 100644 --- a/src/herder/SurgePricingUtils.cpp +++ b/src/herder/SurgePricingUtils.cpp @@ -82,7 +82,7 @@ bool TxFeeComparator::operator()(TransactionFrameBasePtr const& tx1, TransactionFrameBasePtr const& tx2) const { - return txLessThan(tx1, tx2) ^ mIsGreater; + return txLessThan(tx1, tx2); } bool @@ -115,7 +115,7 @@ TxFeeComparator::txLessThan(TransactionFrameBasePtr const& tx1, if (cmp3 != 0) { - return cmp3 < 0; + return mIsGreater ? cmp3 > 0 : cmp3 < 0; } #ifndef BUILD_TESTS // break tie with pointer arithmetic @@ -127,7 +127,7 @@ TxFeeComparator::txLessThan(TransactionFrameBasePtr const& tx1, auto lx = tx1->getFullHash(); auto rx = tx2->getFullHash(); #endif - return lx < rx; + return mIsGreater ? rx < lx : lx < rx; } SurgePricingPriorityQueue::SurgePricingPriorityQueue(