Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Builds/VisualStudio/stellar-core.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ exit /b 0
<ClCompile Include="..\..\src\bucket\BucketManager.cpp" />
<ClCompile Include="..\..\src\bucket\BucketMergeMap.cpp" />
<ClCompile Include="..\..\src\bucket\BucketOutputIterator.cpp" />
<ClCompile Include="..\..\src\bucket\BucketSnapshotManager.cpp" />
<ClCompile Include="..\..\src\bucket\BucketUtils.cpp" />
<ClCompile Include="..\..\src\bucket\DiskIndex.cpp" />
<ClCompile Include="..\..\src\bucket\FutureBucket.cpp" />
Expand Down Expand Up @@ -941,7 +940,6 @@ exit /b 0
<ClInclude Include="..\..\src\bucket\BucketManager.h" />
<ClInclude Include="..\..\src\bucket\BucketMergeMap.h" />
<ClInclude Include="..\..\src\bucket\BucketOutputIterator.h" />
<ClInclude Include="..\..\src\bucket\BucketSnapshotManager.h" />
<ClInclude Include="..\..\src\bucket\BucketUtils.h" />
<ClInclude Include="..\..\src\bucket\DiskIndex.h" />
<ClInclude Include="..\..\src\bucket\FutureBucket.h" />
Expand Down
6 changes: 0 additions & 6 deletions Builds/VisualStudio/stellar-core.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -1278,9 +1278,6 @@
<ClCompile Include="..\..\src\bucket\BucketListSnapshot.cpp">
<Filter>bucket</Filter>
</ClCompile>
<ClCompile Include="..\..\src\bucket\BucketSnapshotManager.cpp">
<Filter>bucket</Filter>
</ClCompile>
<ClCompile Include="..\..\src\overlay\test\TxAdvertsTests.cpp">
<Filter>overlay\tests</Filter>
</ClCompile>
Expand Down Expand Up @@ -2376,9 +2373,6 @@
<ClInclude Include="..\..\src\main\SettingsUpgradeUtils.h">
<Filter>main</Filter>
</ClInclude>
<ClInclude Include="..\..\src\bucket\BucketSnapshotManager.h">
<Filter>bucket</Filter>
</ClInclude>
<ClInclude Include="..\..\src\overlay\TxAdverts.h">
<Filter>overlay</Filter>
</ClInclude>
Expand Down
85 changes: 49 additions & 36 deletions src/bucket/BucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ BucketListSnapshotData<BucketT>::Level::Level(

template <class BucketT>
BucketListSnapshotData<BucketT>::BucketListSnapshotData(
BucketListBase<BucketT> const& bl, LedgerHeader const& header)
BucketListBase<BucketT> const& bl)
: levels([&bl]() {
std::vector<Level> v;
v.reserve(BucketListBase<BucketT>::kNumLevels);
Expand All @@ -50,17 +50,9 @@ BucketListSnapshotData<BucketT>::BucketListSnapshotData(
}
return v;
}())
, header(header)
{
}

template <class BucketT>
uint32_t
BucketListSnapshotData<BucketT>::getLedgerSeq() const
{
return header.ledgerSeq;
}

//
// SearchableBucketListSnapshot
//
Expand All @@ -70,9 +62,11 @@ SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
MetricsRegistry& metrics,
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots)
historicalSnapshots,
uint32_t ledgerSeq)
: mData(std::move(data))
, mHistoricalSnapshots(std::move(historicalSnapshots))
, mLedgerSeq(ledgerSeq)
, mMetrics(metrics)
, mBulkLoadMeter(
metrics.NewMeter({BucketT::METRIC_STRING, "query", "loads"}, "query"))
Expand All @@ -87,6 +81,39 @@ SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
}
}

template <class BucketT>
SearchableBucketListSnapshot<BucketT>::SearchableBucketListSnapshot(
SearchableBucketListSnapshot const& other)
: mData(other.mData)
, mHistoricalSnapshots(other.mHistoricalSnapshots)
, mLedgerSeq(other.mLedgerSeq)
// mStreams intentionally left empty — each copy gets its own stream cache
, mMetrics(other.mMetrics)
, mPointTimers(other.mPointTimers)
, mBulkTimers(other.mBulkTimers)
, mBulkLoadMeter(other.mBulkLoadMeter)
{
}

template <class BucketT>
SearchableBucketListSnapshot<BucketT>&
SearchableBucketListSnapshot<BucketT>::operator=(
SearchableBucketListSnapshot const& other)
{
if (this != &other)
{
mData = other.mData;
mHistoricalSnapshots = other.mHistoricalSnapshots;
mLedgerSeq = other.mLedgerSeq;
mStreams.clear();
mMetrics = other.mMetrics;
mPointTimers = other.mPointTimers;
mBulkTimers = other.mBulkTimers;
mBulkLoadMeter = other.mBulkLoadMeter;
}
return *this;
}

// File streams are fairly expensive to create, so they are lazily created and
// stored in mStreams.
template <class BucketT>
Expand Down Expand Up @@ -292,7 +319,7 @@ SearchableBucketListSnapshot<BucketT>::load(LedgerKey const& k) const

auto timerIter = mPointTimers.find(k.type());
releaseAssert(timerIter != mPointTimers.end());
auto timer = timerIter->second.TimeScope();
auto timer = timerIter->second.get().TimeScope();

std::shared_ptr<typename BucketT::LoadT const> result{};

Expand Down Expand Up @@ -336,7 +363,7 @@ SearchableBucketListSnapshot<BucketT>::loadKeysInternal(
return keys.empty() ? Loop::COMPLETE : Loop::INCOMPLETE;
};

if (!ledgerSeq || *ledgerSeq == mData->getLedgerSeq())
if (!ledgerSeq || *ledgerSeq == mLedgerSeq)
{
loopAllBuckets(loadKeysLoop, *mData);
}
Expand Down Expand Up @@ -370,34 +397,18 @@ SearchableBucketListSnapshot<BucketT>::getBulkLoadTimer(
{
if (numEntries != 0)
{
mBulkLoadMeter.Mark(numEntries);
mBulkLoadMeter.get().Mark(numEntries);
}

auto iter = mBulkTimers.find(label);
if (iter == mBulkTimers.end())
{
auto& metric =
mMetrics.NewTimer({BucketT::METRIC_STRING, "bulk", label});
mMetrics.get().NewTimer({BucketT::METRIC_STRING, "bulk", label});
iter = mBulkTimers.emplace(label, metric).first;
}

return iter->second;
}

template <class BucketT>
uint32_t
SearchableBucketListSnapshot<BucketT>::getLedgerSeq() const
{
releaseAssert(mData);
return mData->getLedgerSeq();
}

template <class BucketT>
LedgerHeader const&
SearchableBucketListSnapshot<BucketT>::getLedgerHeader() const
{
releaseAssert(mData);
return mData->header;
return iter->second.get();
}

template <class BucketT>
Expand All @@ -424,9 +435,10 @@ SearchableLiveBucketListSnapshot::SearchableLiveBucketListSnapshot(
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots)
: SearchableBucketListSnapshot<LiveBucket>(metrics, std::move(data),
std::move(historicalSnapshots))
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<LiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
{
}

Expand Down Expand Up @@ -855,9 +867,10 @@ SearchableHotArchiveBucketListSnapshot::SearchableHotArchiveBucketListSnapshot(
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots)
historicalSnapshots,
uint32_t ledgerSeq)
: SearchableBucketListSnapshot<HotArchiveBucket>(
metrics, std::move(data), std::move(historicalSnapshots))
metrics, std::move(data), std::move(historicalSnapshots), ledgerSeq)
{
}

Expand Down
62 changes: 40 additions & 22 deletions src/bucket/BucketListSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "util/UnorderedSet.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger-entries.h"
#include "xdr/Stellar-ledger.h"

#include <functional>
#include <list>
Expand All @@ -34,7 +33,6 @@ class Timer;
namespace stellar
{

class BucketSnapshotManager;
struct EvictionMetrics;
struct EvictionResultCandidates;
struct EvictionResultEntry;
Expand All @@ -43,10 +41,12 @@ struct StateArchivalSettings;
class EvictionStatistics;
template <class BucketT> class BucketListBase;
template <class BucketT> class BucketLevel;
class CompleteConstLedgerState;
class LedgerStateSnapshot;

// BucketListSnapshotData holds the immutable snapshot data that can be safely
// shared across threads. It contains bucket references and the ledger header,
// but no mutable state like file streams.
// BucketListSnapshotData holds the immutable bucket references that can be
// safely shared across threads. It contains only bucket pointers and no
// metadata (headers, sequence numbers, etc.) or mutable state (file streams).
template <class BucketT> struct BucketListSnapshotData
{
BUCKET_TYPE_ASSERT(BucketT);
Expand All @@ -62,12 +62,8 @@ template <class BucketT> struct BucketListSnapshotData
};

std::vector<Level> const levels;
LedgerHeader const header;

BucketListSnapshotData(BucketListBase<BucketT> const& bl,
LedgerHeader const& header);

uint32_t getLedgerSeq() const;
explicit BucketListSnapshotData(BucketListBase<BucketT> const& bl);
};

// SearchableBucketListSnapshot provides BucketList lookup functionality.
Expand All @@ -88,20 +84,27 @@ template <class BucketT> class SearchableBucketListSnapshot
std::map<uint32_t, std::shared_ptr<BucketListSnapshotData<BucketT> const>>
mHistoricalSnapshots;

// Ledger sequence number for this snapshot, used internally to route
// queries between current and historical data. Not exposed publicly;
// callers should get ledger metadata from CompleteConstLedgerState.
uint32_t mLedgerSeq;

// Per-snapshot mutable stream cache
mutable UnorderedMap<BucketT const*, std::unique_ptr<XDRInputFileStream>>
mStreams;

MetricsRegistry& mMetrics;
std::reference_wrapper<MetricsRegistry> mMetrics;

// Tracks load times for each LedgerEntryType. We use
// SimpleTimer since medida Timer overhead is too expensive for point loads.
UnorderedMap<LedgerEntryType, SimpleTimer&> mPointTimers;
UnorderedMap<LedgerEntryType, std::reference_wrapper<SimpleTimer>>
mPointTimers;

// Bulk load timers take significantly longer, so the timer overhead is
// comparatively negligible.
mutable UnorderedMap<std::string, medida::Timer&> mBulkTimers;
medida::Meter& mBulkLoadMeter;
mutable UnorderedMap<std::string, std::reference_wrapper<medida::Timer>>
mBulkTimers;
std::reference_wrapper<medida::Meter> mBulkLoadMeter;

// Returns (lazily-constructed) file stream for bucket file. Note
// this might be in some random position left over from a previous read --
Expand Down Expand Up @@ -151,9 +154,17 @@ template <class BucketT> class SearchableBucketListSnapshot
std::shared_ptr<BucketListSnapshotData<BucketT> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<BucketT> const>>
historicalSnapshots);
historicalSnapshots,
uint32_t ledgerSeq);

public:
// Copy: copies all state except mStreams, which is reset to empty.
// Each copy gets its own file stream cache, making copies safe to use
// from different threads.
SearchableBucketListSnapshot(SearchableBucketListSnapshot const& other);
SearchableBucketListSnapshot&
operator=(SearchableBucketListSnapshot const& other);

virtual ~SearchableBucketListSnapshot() = default;

// Point load, returns nullptr if not found
Expand All @@ -170,9 +181,6 @@ template <class BucketT> class SearchableBucketListSnapshot
loadKeysFromLedger(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
uint32_t ledgerSeq) const;

uint32_t getLedgerSeq() const;
LedgerHeader const& getLedgerHeader() const;

// Access to underlying data (for copying/refreshing)
std::shared_ptr<BucketListSnapshotData<BucketT> const> const&
getSnapshotData() const;
Expand All @@ -191,7 +199,8 @@ class SearchableLiveBucketListSnapshot
std::shared_ptr<BucketListSnapshotData<LiveBucket> const> data,
std::map<uint32_t,
std::shared_ptr<BucketListSnapshotData<LiveBucket> const>>
historicalSnapshots);
historicalSnapshots,
uint32_t ledgerSeq);

Loop scanForEvictionInBucket(
std::shared_ptr<LiveBucket const> const& bucket, EvictionIterator& iter,
Expand All @@ -200,6 +209,9 @@ class SearchableLiveBucketListSnapshot
UnorderedSet<LedgerKey>& keysInEvictableEntries) const;

public:
SearchableLiveBucketListSnapshot(SearchableLiveBucketListSnapshot const&) =
default;

std::vector<LedgerEntry>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys,
std::string const& label) const;
Expand All @@ -222,7 +234,9 @@ class SearchableLiveBucketListSnapshot
LedgerEntryType type,
std::function<Loop(BucketEntry const&)> callback) const;

friend class BucketSnapshotManager;
friend class BucketSnapshotState;
friend class CompleteConstLedgerState;
friend class LedgerStateSnapshot;
};

// Hot archive bucket list snapshot
Expand All @@ -234,9 +248,13 @@ class SearchableHotArchiveBucketListSnapshot
std::shared_ptr<BucketListSnapshotData<HotArchiveBucket> const> data,
std::map<uint32_t, std::shared_ptr<
BucketListSnapshotData<HotArchiveBucket> const>>
historicalSnapshots);
historicalSnapshots,
uint32_t ledgerSeq);

public:
SearchableHotArchiveBucketListSnapshot(
SearchableHotArchiveBucketListSnapshot const&) = default;

std::vector<HotArchiveBucketEntry>
loadKeys(std::set<LedgerKey, LedgerEntryIdCmp> const& inKeys) const;

Expand All @@ -245,7 +263,7 @@ class SearchableHotArchiveBucketListSnapshot
void scanAllEntries(
std::function<Loop(HotArchiveBucketEntry const&)> callback) const;

friend class BucketSnapshotManager;
friend class LedgerStateSnapshot;
};

extern template struct BucketListSnapshotData<LiveBucket>;
Expand Down
Loading
Loading