diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj index 0aa028f9d2..4fdc05cc75 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj +++ b/Builds/VisualStudio/stellar-core.vcxproj @@ -144,7 +144,7 @@ - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next @@ -207,7 +207,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug curr + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug curr @@ -273,7 +273,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) debug next @@ -337,7 +337,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release next + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release next @@ -400,7 +400,7 @@ exit /b 0 - $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release curr + $(MSBuildProjectDirectory)\build_rust.bat $(OutDir) release curr @@ -577,6 +577,7 @@ exit /b 0 + @@ -1045,6 +1046,7 @@ exit /b 0 + diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters index 7782662115..26d095e903 100644 --- a/Builds/VisualStudio/stellar-core.vcxproj.filters +++ b/Builds/VisualStudio/stellar-core.vcxproj.filters @@ -1323,7 +1323,6 @@ ledger - main @@ -1423,6 +1422,21 @@ invariant + + invariant + + + ledger + + + util + + + util + + + lib\tracy + @@ -2524,6 +2538,18 @@ invariant + + invariant + + + ledger + + + util + + + util + diff --git a/src/herder/ParallelTxSetBuilder.cpp b/src/herder/ParallelTxSetBuilder.cpp index 96901eed16..103d01bac1 100644 --- a/src/herder/ParallelTxSetBuilder.cpp +++ b/src/herder/ParallelTxSetBuilder.cpp @@ -8,6 +8,8 @@ #include "transactions/TransactionFrameBase.h" #include "util/BitSet.h" +#include +#include #include namespace stellar @@ -92,7 +94,14 @@ struct Cluster class Stage { public: - Stage(ParallelPartitionConfig cfg) : mConfig(cfg) + Stage(const Stage&) = delete; + Stage& operator=(const Stage&) = delete; + + Stage(Stage&&) = default; + Stage& operator=(Stage&&) = default; + + Stage(ParallelPartitionConfig cfg, size_t txCount) + : mConfig(cfg), mTxToCluster(txCount, nullptr) { mBinPacking.resize(mConfig.mClustersPerStage); mBinInstructions.resize(mConfig.mClustersPerStage); @@ -113,23 +122,45 @@ class Stage // First, find all clusters that conflict with the new transaction. auto conflictingClusters = getConflictingClusters(tx); - // Then, try creating new clusters by merging the conflicting clusters - // together and adding the new transaction to the resulting cluster. - // Note, that the new cluster is guaranteed to be added at the end of - // `newClusters`. - auto newClusters = createNewClusters(tx, conflictingClusters); - // Fail fast if a new cluster will end up too large to fit into the - // stage and thus no new clusters could be created. - if (!newClusters) + // Check if the merged cluster would exceed the instruction limit. + uint64_t mergedInstructions = tx.mInstructions; + for (auto const* cluster : conflictingClusters) + { + mergedInstructions += cluster->mInstructions; + } + if (mergedInstructions > mConfig.mInstructionsPerCluster) { return false; } + + // Create the merged cluster from the new transaction and all + // conflicting clusters. + auto newCluster = std::make_unique(tx); + for (auto const* cluster : conflictingClusters) + { + newCluster->merge(*cluster); + } + + // Mutate mClusters in-place: remove conflicting clusters (saving + // them for potential rollback) and append the new merged cluster. + std::vector> savedClusters; + if (!conflictingClusters.empty()) + { + savedClusters.reserve(conflictingClusters.size()); + removeConflictingClusters(conflictingClusters, savedClusters); + } + mClusters.push_back(std::move(newCluster)); + // If it's possible to pack the newly-created cluster into one of the // bins 'in-place' without rebuilding the bin-packing, we do so. - if (inPlaceBinPacking(*newClusters->back(), conflictingClusters)) + auto* addedCluster = mClusters.back().get(); + if (inPlaceBinPacking(*addedCluster, conflictingClusters)) { - mClusters = std::move(newClusters.value()); mInstructions += tx.mInstructions; + // Update the global conflict mask so future lookups can + // fast-path when a tx has no conflicts with any cluster. + mAllConflictTxs.inplaceUnion(tx.mConflictTxs); + updateTxToCluster(*addedCluster); return true; } @@ -162,25 +193,30 @@ class Stage { if (mTriedCompactingBinPacking) { + rollbackClusters(addedCluster, savedClusters); return false; } mTriedCompactingBinPacking = true; } // Try to recompute the bin-packing from scratch with a more efficient - // heuristic. + // heuristic. binPacking() sorts mClusters in-place. std::vector newBinInstructions; - auto newPacking = binPacking(*newClusters, newBinInstructions); + auto newPacking = binPacking(mClusters, newBinInstructions); // Even if the new cluster is below the limit, it may invalidate the // stage as a whole in case if we can no longer pack the clusters into // the required number of bins. if (!newPacking) { + rollbackClusters(addedCluster, savedClusters); return false; } - mClusters = std::move(newClusters.value()); mBinPacking = std::move(newPacking.value()); mInstructions += tx.mInstructions; mBinInstructions = newBinInstructions; + // Update the global conflict mask so future lookups can + // fast-path when a tx has no conflicts with any cluster. + mAllConflictTxs.inplaceUnion(tx.mConflictTxs); + updateTxToCluster(*addedCluster); return true; } @@ -205,17 +241,41 @@ class Stage std::unordered_set getConflictingClusters(BuilderTx const& tx) const { + // Fast path: if the tx's id is not in any cluster's conflict set, + // there are no conflicting clusters. + if (!mAllConflictTxs.get(tx.mId)) + { + return {}; + } + // O(K) lookup: iterate the conflict tx ids and find their + // clusters via the tx-to-cluster mapping, instead of scanning + // all clusters (which would be O(C)). std::unordered_set conflictingClusters; - for (auto const& cluster : mClusters) + size_t conflictTxId = 0; + while (tx.mConflictTxs.nextSet(conflictTxId)) { - if (cluster->mConflictTxs.get(tx.mId)) + auto const* cluster = mTxToCluster[conflictTxId]; + if (cluster != nullptr) { - conflictingClusters.insert(cluster.get()); + conflictingClusters.insert(cluster); } + ++conflictTxId; } return conflictingClusters; } + void + updateTxToCluster(Cluster const& cluster) + { + auto* clusterPtr = &cluster; + size_t txId = 0; + while (cluster.mTxIds.nextSet(txId)) + { + mTxToCluster[txId] = clusterPtr; + ++txId; + } + } + bool inPlaceBinPacking( Cluster const& newCluster, @@ -249,40 +309,53 @@ class Stage return false; } - std::optional>> - createNewClusters( - BuilderTx const& tx, - std::unordered_set const& txConflicts) const + // Remove conflicting clusters from mClusters in-place, saving them + // in 'saved' for potential rollback. + void + removeConflictingClusters( + std::unordered_set const& toRemove, + std::vector>& saved) { - uint64_t newInstructions = tx.mInstructions; - for (auto const* cluster : txConflicts) + size_t writePos = 0; + for (size_t readPos = 0; readPos < mClusters.size(); ++readPos) { - newInstructions += cluster->mInstructions; - } - - // Fast-fail condition to ensure that the new cluster doesn't exceed - // the instructions limit. - if (newInstructions > mConfig.mInstructionsPerCluster) - { - return std::nullopt; - } - auto newCluster = std::make_shared(tx); - for (auto const* cluster : txConflicts) - { - newCluster->merge(*cluster); + if (toRemove.find(mClusters[readPos].get()) != toRemove.end()) + { + saved.push_back(std::move(mClusters[readPos])); + } + else + { + if (writePos != readPos) + { + mClusters[writePos] = std::move(mClusters[readPos]); + } + ++writePos; + } } + mClusters.resize(writePos); + } - std::vector> newClusters; - newClusters.reserve(mClusters.size() + 1 - txConflicts.size()); - for (auto const& cluster : mClusters) + // Rollback an in-place mutation: find and remove the merged cluster, + // then restore the saved conflicting clusters. + void + rollbackClusters(Cluster const* mergedCluster, + std::vector>& savedClusters) + { + // Find and swap-pop the merged cluster. + for (size_t i = 0; i < mClusters.size(); ++i) { - if (txConflicts.find(cluster.get()) == txConflicts.end()) + if (mClusters[i].get() == mergedCluster) { - newClusters.push_back(cluster); + mClusters[i] = std::move(mClusters.back()); + mClusters.pop_back(); + break; } } - newClusters.push_back(newCluster); - return std::make_optional(std::move(newClusters)); + // Restore the saved conflicting clusters. + for (auto& saved : savedClusters) + { + mClusters.push_back(std::move(saved)); + } } // Simple bin-packing first-fit-decreasing heuristic @@ -290,7 +363,7 @@ class Stage // This has around 11/9 maximum approximation ratio, which probably has // the best complexity/performance tradeoff out of all the heuristics. std::optional> - binPacking(std::vector>& clusters, + binPacking(std::vector>& clusters, std::vector& binInsns) const { // We could consider dropping the sort here in order to save some time @@ -344,7 +417,7 @@ class Stage // Looked at another way: two clusters that _aren't_ merged by the end of // the process of forming clusters _are_ data-independent and _could_ // potentially run in parallel. - std::vector> mClusters; + std::vector> mClusters; // The clusters formed by data dependency merging may, however, // significantly outnumber the maximum _allowed_ amount of parallelism in // the stage -- a number called `ledgerMaxDependentTxClusters` in CAP-0063 @@ -374,6 +447,14 @@ class Stage uint64_t mInstructions = 0; ParallelPartitionConfig mConfig; bool mTriedCompactingBinPacking = false; + // Union of all clusters' mConflictTxs. Used as a fast-path check in + // getConflictingClusters to avoid scanning all clusters when the + // transaction has no conflicts with any existing cluster. + BitSet mAllConflictTxs; + // Maps tx id -> cluster pointer for O(K) conflict lookup. + // Sized to the total number of transactions; nullptr means the tx + // has not been added to this stage. + std::vector mTxToCluster; }; struct ParallelPhaseBuildResult @@ -385,53 +466,68 @@ struct ParallelPhaseBuildResult ParallelPhaseBuildResult buildSurgePricedParallelSorobanPhaseWithStageCount( - SurgePricingPriorityQueue queue, - std::unordered_map const& - builderTxForTx, - TxFrameList const& txFrames, uint32_t stageCount, - SorobanNetworkConfig const& sorobanCfg, - std::shared_ptr laneConfig, uint32_t ledgerVersion) + std::vector const& sortedTxOrder, + std::vector const& txResources, Resource const& laneLimit, + std::vector const& builderTxs, TxFrameList const& txFrames, + uint32_t stageCount, SorobanNetworkConfig const& sorobanCfg) { ZoneScoped; ParallelPartitionConfig partitionCfg(stageCount, sorobanCfg); - std::vector stages(partitionCfg.mStageCount, partitionCfg); + std::vector stages; + stages.reserve(partitionCfg.mStageCount); + for (uint32_t i = 0; i < partitionCfg.mStageCount; ++i) + { + stages.emplace_back(partitionCfg, txFrames.size()); + } + + // Iterate transactions in decreasing fee order and try greedily pack them + // into one of the stages until the limits are reached. Transactions that + // don't fit into any of the stages are skipped and surge pricing will be + // triggered for the transaction set. + Resource laneLeft = laneLimit; + bool hadTxNotFittingLane = false; + + for (size_t txIdx : sortedTxOrder) + { + auto const& txRes = txResources[txIdx]; - // Visit the transactions in the surge pricing queue and try to add them to - // at least one of the stages. - auto visitor = [&stages, - &builderTxForTx](TransactionFrameBaseConstPtr const& tx) { + // Check if the transaction fits within the remaining lane resource + // limits. This mirrors the anyGreater check in popTopTxs that skips + // transactions exceeding resource limits. + if (anyGreater(txRes, laneLeft)) + { + hadTxNotFittingLane = true; + continue; + } + + // Try to add the transaction to one of the stages. bool added = false; - auto builderTxIt = builderTxForTx.find(tx); - releaseAssert(builderTxIt != builderTxForTx.end()); for (auto& stage : stages) { - if (stage.tryAdd(*builderTxIt->second)) + if (stage.tryAdd(builderTxs[txIdx])) { added = true; break; } } + if (added) { - return SurgePricingPriorityQueue::VisitTxResult::PROCESSED; + // Transaction included in the stage, update the remaining lane + // resources. + laneLeft -= txRes; } - // If a transaction didn't fit into any of the stages, we consider it - // to have been excluded due to resource limits and thus notify the - // surge pricing queue that surge pricing should be triggered ( - // REJECTED imitates the behavior for exceeding the resource limit - // within the queue itself). - return SurgePricingPriorityQueue::VisitTxResult::REJECTED; - }; + else + { + // Transaction didn't fit into any of the stages, mark that lane + // limits were exceeded to trigger surge pricing. + hadTxNotFittingLane = true; + } + } ParallelPhaseBuildResult result; - std::vector laneLeftUntilLimitUnused; - queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused, - result.mHadTxNotFittingLane, ledgerVersion); - // There is only a single fee lane for Soroban, so there is only a single - // flag that indicates whether there was a transaction that didn't fit into - // lane (and thus all transactions are surge priced at once). - releaseAssert(result.mHadTxNotFittingLane.size() == 1); + result.mHadTxNotFittingLane = {hadTxNotFittingLane}; // At this point the stages have been filled with transactions and we just // need to place the full transactions into the respective stages/clusters. @@ -442,22 +538,20 @@ buildSurgePricedParallelSorobanPhaseWithStageCount( auto& resStage = result.mStages.emplace_back(); resStage.reserve(partitionCfg.mClustersPerStage); - std::unordered_map clusterIdToStageCluster; + std::vector binToStageCluster( + partitionCfg.mClustersPerStage, std::numeric_limits::max()); - stage.visitAllTransactions( - [&resStage, &txFrames, &clusterIdToStageCluster, - &totalInclusionFee](size_t clusterId, size_t txId) { - auto it = clusterIdToStageCluster.find(clusterId); - if (it == clusterIdToStageCluster.end()) - { - it = clusterIdToStageCluster - .emplace(clusterId, resStage.size()) - .first; - resStage.emplace_back(); - } - totalInclusionFee += txFrames[txId]->getInclusionFee(); - resStage[it->second].push_back(txFrames[txId]); - }); + stage.visitAllTransactions([&resStage, &txFrames, &binToStageCluster, + &totalInclusionFee](size_t binId, + size_t txId) { + if (binToStageCluster[binId] == std::numeric_limits::max()) + { + binToStageCluster[binId] = resStage.size(); + resStage.emplace_back(); + } + totalInclusionFee += txFrames[txId]->getInclusionFee(); + resStage[binToStageCluster[binId]].push_back(txFrames[txId]); + }); // Algorithm ensures that clusters are populated from first to last and // no empty clusters are generated. for (auto const& cluster : resStage) @@ -498,89 +592,163 @@ buildSurgePricedParallelSorobanPhase( double const MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT = 0.999; // Simplify the transactions to the minimum necessary amount of data. - std::unordered_map - builderTxForTx; - std::vector> builderTxs; + std::vector builderTxs; builderTxs.reserve(txFrames.size()); for (size_t i = 0; i < txFrames.size(); ++i) { - auto const& txFrame = txFrames[i]; - builderTxs.emplace_back(std::make_unique(i, *txFrame)); - builderTxForTx.emplace(txFrame, builderTxs.back().get()); + builderTxs.emplace_back(i, *txFrames[i]); } // Before trying to include any transactions, find all the pairs of the // conflicting transactions and mark the conflicts in the builderTxs. // - // In order to find the conflicts, we build the maps from the footprint - // keys to transactions, then mark the conflicts between the transactions - // that share RW key, or between the transactions that share RO and RW key. - // - // The approach here is optimized towards the low number of conflicts, - // specifically when there are no conflicts at all, the complexity is just - // O(total_footprint_entry_count). The worst case is roughly - // O(max_tx_footprint_size * transaction_count ^ 2), which is equivalent - // to the complexity of the straightforward approach of iterating over all - // the transaction pairs. + // We use a sort-based approach: collect all footprint entries into a flat + // vector tagged with (key hash, tx id, RO/RW), sort by hash, + // then scan for groups sharing the same key hash. This is significantly + // faster in practice than using hash map lookups. // // This also has the further optimization potential: we could populate the // key maps and even the conflicting transactions eagerly in tx queue, thus // amortizing the costs across the whole ledger duration. - UnorderedMap> txsWithRoKey; - UnorderedMap> txsWithRwKey; + struct FpEntry + { + size_t keyHash; + uint32_t txId; + bool isRW; + }; + + // Count total footprint entries for a single allocation. + size_t totalFpEntries = 0; + for (auto const& txFrame : txFrames) + { + auto const& fp = txFrame->sorobanResources().footprint; + totalFpEntries += fp.readOnly.size() + fp.readWrite.size(); + } + + std::vector fpEntries; + fpEntries.reserve(totalFpEntries); + std::hash keyHasher; for (size_t i = 0; i < txFrames.size(); ++i) { - auto const& txFrame = txFrames[i]; - auto const& footprint = txFrame->sorobanResources().footprint; + auto const& footprint = txFrames[i]->sorobanResources().footprint; for (auto const& key : footprint.readOnly) { - txsWithRoKey[key].push_back(i); + fpEntries.push_back( + {keyHasher(key), static_cast(i), false}); } for (auto const& key : footprint.readWrite) { - txsWithRwKey[key].push_back(i); + fpEntries.push_back( + {keyHasher(key), static_cast(i), true}); } } - for (auto const& [key, rwTxIds] : txsWithRwKey) + // Sort by hash for cache-friendly grouping. + std::sort(fpEntries.begin(), fpEntries.end(), + [](FpEntry const& a, FpEntry const& b) { + return a.keyHash < b.keyHash; + }); + + // Scan sorted entries for groups sharing the same hash, then mark + // conflicts between transactions that share RW keys (RW-RW and RO-RW). + // Conservatively treat hash collisions as potential conflicts - collisions + // should generally be rare and allocating collisions to the same thread + // is guaranteed to be safe (while disambiguating the conflicts would be + // expensive and complex). Collision probability is really low (K^2/2^64). + for (size_t groupStart = 0; groupStart < fpEntries.size();) { + size_t groupEnd = groupStart + 1; + while (groupEnd < fpEntries.size() && + fpEntries[groupEnd].keyHash == fpEntries[groupStart].keyHash) + { + ++groupEnd; + } + + // Skip singleton groups — no possible conflicts. + if (groupEnd - groupStart < 2) + { + groupStart = groupEnd; + continue; + } + + // Collect all entries with the matching key hash. + std::vector roTxs; + std::vector rwTxs; + for (size_t i = groupStart; i < groupEnd; ++i) + { + if (fpEntries[i].isRW) + { + rwTxs.push_back(fpEntries[i].txId); + } + else + { + roTxs.push_back(fpEntries[i].txId); + } + } // RW-RW conflicts - for (size_t i = 0; i < rwTxIds.size(); ++i) + for (size_t i = 0; i < rwTxs.size(); ++i) { - for (size_t j = i + 1; j < rwTxIds.size(); ++j) + for (size_t j = i + 1; j < rwTxs.size(); ++j) { - builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]); - builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]); + // In a rare case of hash collision within a transaction, we + // might have the same transaction appear several times in the + // same group. + if (rwTxs[i] == rwTxs[j]) + { + continue; + } + builderTxs[rwTxs[i]].mConflictTxs.set(rwTxs[j]); + builderTxs[rwTxs[j]].mConflictTxs.set(rwTxs[i]); } } // RO-RW conflicts - auto roIt = txsWithRoKey.find(key); - if (roIt != txsWithRoKey.end()) + for (size_t i = 0; i < roTxs.size(); ++i) { - auto const& roTxIds = roIt->second; - for (size_t i = 0; i < roTxIds.size(); ++i) + for (size_t j = 0; j < rwTxs.size(); ++j) { - for (size_t j = 0; j < rwTxIds.size(); ++j) + // In a rare case of hash collision within a transaction, we + // might have the same transaction appear several times in the + // same group. + if (roTxs[i] == rwTxs[j]) { - builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]); - builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]); + continue; } + builderTxs[roTxs[i]].mConflictTxs.set(rwTxs[j]); + builderTxs[rwTxs[j]].mConflictTxs.set(roTxs[i]); } } + + groupStart = groupEnd; } - // Process the transactions in the surge pricing (decreasing fee) order. - // This also automatically ensures that the resource limits are respected - // for all the dimensions besides instructions. - SurgePricingPriorityQueue queue( - /* isHighestPriority */ true, laneConfig, + // Sort transactions in decreasing inclusion fee order. + TxFeeComparator txComparator( + /* isGreater */ true, stellar::rand_uniform(0, std::numeric_limits::max())); + std::vector sortedTxOrder(txFrames.size()); + std::iota(sortedTxOrder.begin(), sortedTxOrder.end(), 0); + std::sort(sortedTxOrder.begin(), sortedTxOrder.end(), + [&txFrames, &txComparator](size_t a, size_t b) { + return txComparator(txFrames[a], txFrames[b]); + }); + + // Precompute per-transaction resources to avoid repeated virtual calls + // and heap allocations across threads. + std::vector txResources; + txResources.reserve(txFrames.size()); for (auto const& tx : txFrames) { - queue.add(tx, ledgerVersion); + txResources.push_back( + tx->getResources(/* useByteLimitInClassic */ false, ledgerVersion)); } - // Create a worker thread for each stage count. + // Get the lane limit. Soroban uses a single generic lane. + auto const& laneLimits = laneConfig->getLaneLimits(); + releaseAssert(laneLimits.size() == 1); + auto const& laneLimit = laneLimits[0]; + + // Create a worker thread for each stage count. The sorted order and + // precomputed resources are shared across all threads (read-only). std::vector threads; uint32_t stageCountOptions = cfg.SOROBAN_PHASE_MAX_STAGE_COUNT - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT + 1; @@ -590,13 +758,13 @@ buildSurgePricedParallelSorobanPhase( stageCount <= cfg.SOROBAN_PHASE_MAX_STAGE_COUNT; ++stageCount) { size_t resultIndex = stageCount - cfg.SOROBAN_PHASE_MIN_STAGE_COUNT; - threads.emplace_back([queue, &builderTxForTx, txFrames, stageCount, - sorobanCfg, laneConfig, resultIndex, &results, - ledgerVersion]() { + threads.emplace_back([&sortedTxOrder, &txResources, &laneLimit, + &builderTxs, &txFrames, stageCount, &sorobanCfg, + resultIndex, &results]() { results.at(resultIndex) = buildSurgePricedParallelSorobanPhaseWithStageCount( - std::move(queue), builderTxForTx, txFrames, stageCount, - sorobanCfg, laneConfig, ledgerVersion); + sortedTxOrder, txResources, laneLimit, builderTxs, txFrames, + stageCount, sorobanCfg); }); } for (auto& thread : threads) diff --git a/src/herder/SurgePricingUtils.cpp b/src/herder/SurgePricingUtils.cpp index 40514992dd..fe9d0c17b8 100644 --- a/src/herder/SurgePricingUtils.cpp +++ b/src/herder/SurgePricingUtils.cpp @@ -70,8 +70,7 @@ computeBetterFee(TransactionFrameBase const& tx, int64_t refFeeBid, return minFee; } -SurgePricingPriorityQueue::TxComparator::TxComparator(bool isGreater, - size_t _seed) +TxFeeComparator::TxFeeComparator(bool isGreater, size_t _seed) : mIsGreater(isGreater) #ifndef BUILD_TESTS , mSeed(_seed) @@ -80,47 +79,43 @@ SurgePricingPriorityQueue::TxComparator::TxComparator(bool isGreater, } bool -SurgePricingPriorityQueue::TxComparator::operator()( - TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const +TxFeeComparator::operator()(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const { - return txLessThan(tx1, tx2) ^ mIsGreater; + return txLessThan(tx1, tx2); } bool -SurgePricingPriorityQueue::TxComparator::compareFeeOnly( - TransactionFrameBase const& tx1, TransactionFrameBase const& tx2) const +TxFeeComparator::compareFeeOnly(TransactionFrameBase const& tx1, + TransactionFrameBase const& tx2) const { return compareFeeOnly(tx1.getInclusionFee(), tx1.getNumOperations(), tx2.getInclusionFee(), tx2.getNumOperations()); } bool -SurgePricingPriorityQueue::TxComparator::compareFeeOnly(int64_t tx1Bid, - uint32_t tx1Ops, - int64_t tx2Bid, - uint32_t tx2Ops) const +TxFeeComparator::compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, + uint32_t tx2Ops) const { bool isLess = feeRate3WayCompare(tx1Bid, tx1Ops, tx2Bid, tx2Ops) < 0; return isLess ^ mIsGreater; } bool -SurgePricingPriorityQueue::TxComparator::isGreater() const +TxFeeComparator::isGreater() const { return mIsGreater; } bool -SurgePricingPriorityQueue::TxComparator::txLessThan( - TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const +TxFeeComparator::txLessThan(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const { auto cmp3 = feeRate3WayCompare(*tx1, *tx2); if (cmp3 != 0) { - return cmp3 < 0; + return mIsGreater ? cmp3 > 0 : cmp3 < 0; } #ifndef BUILD_TESTS // break tie with pointer arithmetic @@ -132,7 +127,7 @@ SurgePricingPriorityQueue::TxComparator::txLessThan( auto lx = tx1->getFullHash(); auto rx = tx2->getFullHash(); #endif - return lx < rx; + return mIsGreater ? rx < lx : lx < rx; } SurgePricingPriorityQueue::SurgePricingPriorityQueue( diff --git a/src/herder/SurgePricingUtils.h b/src/herder/SurgePricingUtils.h index 19927bc857..0c353dbf07 100644 --- a/src/herder/SurgePricingUtils.h +++ b/src/herder/SurgePricingUtils.h @@ -21,6 +21,36 @@ int feeRate3WayCompare(int64_t lFeeBid, uint32_t lNbOps, int64_t rFeeBid, int64_t computeBetterFee(TransactionFrameBase const& tx, int64_t refFeeBid, uint32_t refNbOps); +// Transaction comparator for transaction prioritization and surge pricing +// based on transaction inclusion fees. +// operator() is suitable for sorting the transactions while shuffling +// transactions with the same fee rate in random order. +// compareFeeOnly can be used to just determine whether one transaction has a +// strictly higher fee rate than the other. +class TxFeeComparator +{ + public: + TxFeeComparator(bool isGreater, size_t seed); + + bool operator()(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const; + + bool compareFeeOnly(TransactionFrameBase const& tx1, + TransactionFrameBase const& tx2) const; + bool compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, + uint32_t tx2Ops) const; + bool isGreater() const; + + private: + bool txLessThan(TransactionFrameBasePtr const& tx1, + TransactionFrameBasePtr const& tx2) const; + + bool const mIsGreater; +#ifndef BUILD_TESTS + size_t mSeed; +#endif +}; + // Configuration for multi-lane transaction limiting and surge pricing. // // This configuration defines how many 'lanes' are there to compare and limit @@ -225,31 +255,7 @@ class SurgePricingPriorityQueue std::nullopt); private: - class TxComparator - { - public: - TxComparator(bool isGreater, size_t seed); - - bool operator()(TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const; - - bool compareFeeOnly(TransactionFrameBase const& tx1, - TransactionFrameBase const& tx2) const; - bool compareFeeOnly(int64_t tx1Bid, uint32_t tx1Ops, int64_t tx2Bid, - uint32_t tx2Ops) const; - bool isGreater() const; - - private: - bool txLessThan(TransactionFrameBasePtr const& tx1, - TransactionFrameBasePtr const& tx2) const; - - bool const mIsGreater; -#ifndef BUILD_TESTS - size_t mSeed; -#endif - }; - - using TxSortedSet = std::set; + using TxSortedSet = std::set; using LaneIter = std::pair; // Iterator for walking the queue from top to bottom, possibly restricted @@ -285,7 +291,7 @@ class SurgePricingPriorityQueue Iterator getTop() const; - TxComparator const mComparator; + TxFeeComparator const mComparator; std::shared_ptr mLaneConfig; std::vector const& mLaneLimits; diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index 602f0cf181..952345e607 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -940,15 +940,20 @@ makeTxSetFromTransactions( static_cast(i)); } } - + if (!valid) + { + throw std::runtime_error("Created invalid tx set frame - shape is " + "mismatched after roundtrip."); + } // We already trimmed invalid transactions in an earlier call to // `trimInvalid`, so skip transaction validation here - valid = valid && outputApplicableTxSet->checkValidInternal( - app, lowerBoundCloseTimeOffset, - upperBoundCloseTimeOffset, true); - if (!valid) + auto validationResult = outputApplicableTxSet->checkValidInternalWithResult( + app, lowerBoundCloseTimeOffset, upperBoundCloseTimeOffset, true); + if (validationResult != TxSetValidationResult::VALID) { - throw std::runtime_error("Created invalid tx set frame"); + throw std::runtime_error(fmt::format( + FMT_STRING("Created invalid tx set frame, validation result: {:s}"), + toString(validationResult))); } return std::make_pair(outputTxSet, std::move(outputApplicableTxSet)); diff --git a/src/herder/test/TxSetTests.cpp b/src/herder/test/TxSetTests.cpp index 376bfef3a1..1b8d32bffa 100644 --- a/src/herder/test/TxSetTests.cpp +++ b/src/herder/test/TxSetTests.cpp @@ -3321,12 +3321,32 @@ TEST_CASE("parallel tx set building benchmark", << ", mean duration: " << 1e-6 * totalDuration / iterCount << " ms" << std::endl; }; + std::cout << "=== Parallel Tx Set Building Benchmark ===" << std::endl; + std::cout << "TX_COUNT=" + << MEAN_INCLUDED_TX_COUNT * TX_COUNT_MEMPOOL_MULTIPLIER + << " CLUSTER_COUNT=" << CLUSTER_COUNT + << " STAGES=" << MIN_STAGE_COUNT << "-" << MAX_STAGE_COUNT + << std::endl; + std::cout << "---" << std::endl; + // Fully independent (no conflicts) - stresses bin packing & cluster scan runBenchmark(0, 0, 0); + // Very sparse conflicts - mostly independent fast path + runBenchmark(0.1, 5, 1); + // Mix of independent and small conflict clusters + runBenchmark(0.5, 2, 2); + // Rare conflicts with large RO fan-out runBenchmark(1, 1000, 1); + // RW-only small conflict groups + runBenchmark(5, 0, 3); + // Moderate conflicts, large RO fan-out runBenchmark(10, 40, 1); + // Heavy conflicts, large RO fan-out runBenchmark(20, 40, 1); + // Moderate balanced RO/RW conflicts runBenchmark(10, 10, 10); + // Very heavy conflicts runBenchmark(50, 50, 5); + std::cout << "===" << std::endl; } } // namespace } // namespace stellar