Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
550 changes: 532 additions & 18 deletions cmd/prometheus/main.go

Large diffs are not rendered by default.

86 changes: 79 additions & 7 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Adapter struct {
hashdexLimits cppbridge.WALHashdexLimits
transparentState *cppbridge.StateV2
mergeOutOfOrderChunks func()
longtermIntervalMs int64

// stat
activeQuerierMetrics *querier.Metrics
Expand All @@ -41,12 +42,51 @@ type Adapter struct {
samplesAppended prometheus.Counter
}

// NewAdapter init new [Adapter].
// NewAdapter init new main [Adapter].
func NewAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
mergeOutOfOrderChunks,
0,
querier.QueryableAppenderSource,
querier.QueryableStorageSource,
registerer,
)
}

// NewLongtermAdapter init new longterm [Adapter].
func NewLongtermAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
mergeOutOfOrderChunks,
longtermIntervalMs,
querier.QueryableLongtermAppenderSource,
querier.QueryableLongtermStorageSource,
registerer,
)
}

// newAdapter init new [Adapter].
func newAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
activeSource, storageSource string,
registerer prometheus.Registerer,
) *Adapter {
factory := util.NewUnconflictRegisterer(registerer)
return &Adapter{
Expand All @@ -56,8 +96,9 @@ func NewAdapter(
hashdexLimits: cppbridge.DefaultWALHashdexLimits(),
transparentState: cppbridge.NewTransitionStateV2(),
mergeOutOfOrderChunks: mergeOutOfOrderChunks,
activeQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableAppenderSource),
storageQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableStorageSource),
longtermIntervalMs: longtermIntervalMs,
activeQuerierMetrics: querier.NewMetrics(registerer, activeSource),
storageQuerierMetrics: querier.NewMetrics(registerer, storageSource),
appendDuration: factory.NewHistogram(
prometheus.HistogramOpts{
Name: "prompp_adapter_append_duration",
Expand Down Expand Up @@ -219,7 +260,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewChunkQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -229,7 +277,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

queriers = append(
queriers,
querier.NewChunkQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)
}

Expand All @@ -254,6 +309,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) {
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
), nil
Expand Down Expand Up @@ -281,7 +337,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics),
querier.NewQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -291,7 +355,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {

queriers = append(
queriers,
querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics),
querier.NewQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.storageQuerierMetrics,
),
)
}

Expand Down
1 change: 1 addition & 0 deletions pp-pkg/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down
2 changes: 1 addition & 1 deletion pp/entrypoint/go_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#define Sizeof_RoaringBitset 40
#define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset)

#define Sizeof_SerializedDataIterator 192
#define Sizeof_SerializedDataIterator 200
16 changes: 12 additions & 4 deletions pp/entrypoint/head/serialization.h
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
#pragma once

#include "series_data/decoder/decorator/downsampling_decode_iterator.h"
#include "series_data/serialization/serialized_data.h"

namespace entrypoint::head {

using DecodeIterator = series_data::decoder::decorator::DownsamplingDecodeIterator<series_data::decoder::UniversalDecodeIterator>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator<DecodeIterator>;

class SerializedDataGo {
public:
explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)} {}
explicit SerializedDataGo(const series_data::DataStorage& storage,
const series_data::querier::QueriedChunkList& queried_chunks,
PromPP::Primitives::Timestamp downsampling_ms)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)}, downsampling_ms_(downsampling_ms) {}

[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer_view() const noexcept { return data_view_.get_buffer_view(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks_view() const noexcept { return data_view_.get_chunks_view(); }

[[nodiscard]] PROMPP_ALWAYS_INLINE auto next() noexcept { return data_view_.next_series(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator(uint32_t chunk_id) const noexcept { return data_view_.create_series_iterator(chunk_id); }
[[nodiscard]] PROMPP_ALWAYS_INLINE SerializedDataIterator iterator(uint32_t chunk_id) const noexcept {
return data_view_.create_series_iterator<DecodeIterator>(chunk_id, DecodeIterator(downsampling_ms_));
}

private:
series_data::serialization::SerializedData data_;
series_data::serialization::SerializedDataView data_view_{data_};
PromPP::Primitives::Timestamp downsampling_ms_{};
};

using SerializedDataPtr = std::unique_ptr<SerializedDataGo>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator;

static_assert(sizeof(SerializedDataPtr) == sizeof(void*));

Expand Down
10 changes: 7 additions & 3 deletions pp/entrypoint/series_data/querier.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ class RangeQuerierWithArgumentsWrapperV2 {
using BytesStream = PromPP::Primitives::Go::BytesStream;

public:
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data)
: querier_(storage), query_(&query), serialized_data_(serialized_data) {}
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage,
const Query& query,
head::SerializedDataPtr* serialized_data,
PromPP::Primitives::Timestamp downsampling_ms)
: querier_(storage), query_(&query), serialized_data_(serialized_data), downsampling_ms_(downsampling_ms) {}

void query() noexcept {
querier_.query(*query_);
Expand All @@ -118,9 +121,10 @@ class RangeQuerierWithArgumentsWrapperV2 {
::series_data::querier::Querier querier_;
const Query* query_;
head::SerializedDataPtr* serialized_data_;
PromPP::Primitives::Timestamp downsampling_ms_;

PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept {
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks()));
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks(), downsampling_ms_));
}
};

Expand Down
3 changes: 2 additions & 1 deletion pp/entrypoint/series_data_data_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
struct Arguments {
DataStoragePtr data_storage;
Query query;
PromPP::Primitives::Timestamp downsampling_ms;
};

struct Result {
Expand All @@ -162,7 +163,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
const auto in = static_cast<Arguments*>(args);
const auto out = static_cast<Result*>(res);

RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data);
RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data, in->downsampling_ms);
querier.query();

if (querier.need_loading()) {
Expand Down
5 changes: 3 additions & 2 deletions pp/entrypoint/series_data_data_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ void prompp_series_data_data_storage_query(void* args, void* res);
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed.
* querier uintptr // pointer to constructed Querier if data loading is needed.
* // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final.
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
Expand Down
9 changes: 5 additions & 4 deletions pp/go/cppbridge/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,11 +1856,12 @@ type DataStorageQueryResult struct {
SerializedData *DataStorageSerializedData
}

func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData) (querier uintptr, status uint8) {
func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData, downsamplingMs int64) (querier uintptr, status uint8) {
args := struct {
dataStorage uintptr
query HeadDataStorageQuery
}{dataStorage, query}
dataStorage uintptr
query HeadDataStorageQuery
downsamplingMs int64
}{dataStorage, query, downsamplingMs}

res := struct {
Querier uintptr
Expand Down
7 changes: 4 additions & 3 deletions pp/go/cppbridge/entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void prompp_dump_memory_profile(void* args, void* res);
#define Sizeof_RoaringBitset 40
#define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset)

#define Sizeof_SerializedDataIterator 192
#define Sizeof_SerializedDataIterator 200
#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -1349,12 +1349,13 @@ void prompp_series_data_data_storage_query(void* args, void* res);
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed.
* querier uintptr // pointer to constructed Querier if data loading is needed.
* // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final.
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
Expand Down
23 changes: 16 additions & 7 deletions pp/go/cppbridge/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
NormalNaN uint64 = 0x7ff8000000000001

StaleNaN uint64 = 0x7ff0000000000002

NoDownsampling = 0
)

func IsStaleNaN(v float64) bool {
Expand Down Expand Up @@ -314,9 +316,9 @@ func (i HeadDataStorageSerializedChunkIndex) Chunks(r *HeadDataStorageSerialized
return res
}

func (ds *HeadDataStorage) Query(query HeadDataStorageQuery) DataStorageQueryResult {
func (ds *HeadDataStorage) Query(query HeadDataStorageQuery, downsamplingMs int64) DataStorageQueryResult {
sd := NewDataStorageSerializedData()
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd)
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd, downsamplingMs)
return DataStorageQueryResult{
Querier: querier,
Status: status,
Expand Down Expand Up @@ -356,10 +358,9 @@ func (sd *DataStorageSerializedData) Next() (uint32, uint32) {
}

type DataStorageSerializedDataIteratorControlBlock struct {
decoderVariant uint64
Timestamp int64
Value float64
remainingSamples uint8
decodedTimestamp int64
timestamp int64
value float64
}

type DataStorageSerializedDataIterator struct {
Expand All @@ -386,7 +387,15 @@ func (it *DataStorageSerializedDataIterator) Reset(serializedData *DataStorageSe
}

func (it *DataStorageSerializedDataIterator) HasData() bool {
return it.remainingSamples != 0
return it.decodedTimestamp != math.MinInt64
}

func (it *DataStorageSerializedDataIterator) Timestamp() int64 {
return it.timestamp
}

func (it *DataStorageSerializedDataIterator) Value() float64 {
return it.value
}

// UnloadedDataLoader is Go wrapper around series_data::Loader.
Expand Down
6 changes: 3 additions & 3 deletions pp/go/cppbridge/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (s *HeadSuite) TestSerializedChunkRecoder() {
result := s.dataStorage.Query(cppbridge.HeadDataStorageQuery{
StartTimestampMs: timeInterval.MinT,
EndTimestampMs: timeInterval.MaxT,
LabelSetIDs: []uint32{0, 1}},
)
LabelSetIDs: []uint32{0, 1},
}, cppbridge.NoDownsampling)
recoder := cppbridge.NewSerializedChunkRecoder(result.SerializedData, timeInterval)

// Act
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *HeadSuite) TestInstantQuery() {
// Arrange
dataStorage := cppbridge.NewHeadDataStorage()
encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage)
var series = []struct {
series := []struct {
SeriesID uint32
cppbridge.Sample
}{
Expand Down
2 changes: 1 addition & 1 deletion pp/go/storage/appender/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *AppenderSuite) getHeadData(labelSetIDs []uint32) headStorageData {
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
LabelSetIDs: labelSetIDs,
})
}, cppbridge.NoDownsampling)
data.dsResult = append(data.dsResult, dsResult)

data.shards = append(data.shards, storageData{
Expand Down
Loading
Loading