diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 73b4d77a0..e985c5ba0 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -113,6 +113,9 @@ var ( defaultRetentionString = "15d" defaultRetentionDuration model.Duration + defaultLongtermRetentionString = "30d" + defaultLongtermRetentionDuration model.Duration + agentMode bool agentOnlyFlags, serverOnlyFlags []string ) @@ -125,6 +128,11 @@ func init() { if err != nil { panic(err) } + + defaultLongtermRetentionDuration, err = model.ParseDuration(defaultLongtermRetentionString) + if err != nil { + panic(err) + } } // serverOnlyFlag creates server-only kingpin flag. @@ -172,6 +180,12 @@ type flagConfig struct { WalMaxSamplesPerSegment uint32 HeadRetentionTimeout model.Duration + // longterm + longtermStoragePath string + longtermTSBD tsdbOptions + longtermInterval model.Duration + longtermLookbackDelta model.Duration + featureList []string memlimitRatio float64 // These options are extracted from featureList @@ -208,6 +222,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") case "exemplar-storage": c.tsdb.EnableExemplarStorage = true + c.longtermTSBD.EnableExemplarStorage = true level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled") case "memory-snapshot-on-shutdown": c.tsdb.EnableMemorySnapshotOnShutdown = true @@ -244,6 +259,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "Experimental PromQL functions enabled.") case "native-histograms": c.tsdb.EnableNativeHistograms = true + c.longtermTSBD.EnableNativeHistograms = true c.scrape.EnableNativeHistogramsIngestion = true // Change relevant global variables. Hacky, but it's hard to pass a new option or default to unmarshallers. config.DefaultConfig.GlobalConfig.ScrapeProtocols = config.DefaultProtoFirstScrapeProtocols @@ -257,6 +273,7 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { level.Info(logger).Log("msg", "Experimental created timestamp zero ingestion enabled. Changed default scrape_protocols to prefer PrometheusProto format.", "global.scrape_protocols", fmt.Sprintf("%v", config.DefaultGlobalConfig.ScrapeProtocols)) case "delayed-compaction": c.tsdb.EnableDelayedCompaction = true + c.longtermTSBD.EnableDelayedCompaction = true level.Info(logger).Log("msg", "Experimental delayed compaction is enabled.") case "promql-delayed-name-removal": c.promqlEnableDelayedNameRemoval = true @@ -277,6 +294,85 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { return nil } +func (c *flagConfig) validateForLongterm(cfgFile *config.Config, logger log.Logger) error { + if !c.longtermActivated() { + // longterm is disable + return nil + } + + if c.longtermInterval <= cfgFile.GlobalConfig.ScrapeInterval { + return fmt.Errorf( + "the scratch interval(%s) should be less than the longterm interval(%s)", + cfgFile.GlobalConfig.ScrapeInterval, + c.longtermInterval, + ) + } + + if c.longtermTSBD.WALSegmentSize != 0 && + (c.longtermTSBD.WALSegmentSize < 10*1024*1024 || c.longtermTSBD.WALSegmentSize > 256*1024*1024) { + return errors.New("flag 'storage.longterm.wal-segment-size' must be set between 10MB and 256MB") + } + + if c.longtermTSBD.MaxBlockChunkSegmentSize != 0 && c.longtermTSBD.MaxBlockChunkSegmentSize < 1024*1024 { + return errors.New("flag 'storage.longterm.max-block-chunk-segment-size' must be set over 1MB") + } + + c.longtermTSBD.WALCompression = true + c.longtermTSBD.WALCompressionType = "snappy" + + if c.longtermTSBD.EnableExemplarStorage { + if cfgFile.StorageConfig.ExemplarsConfig == nil { + c.longtermTSBD.MaxExemplars = config.DefaultExemplarsConfig.MaxExemplars + } else { + c.longtermTSBD.MaxExemplars = cfgFile.StorageConfig.ExemplarsConfig.MaxExemplars + } + } + + if cfgFile.StorageConfig.TSDBConfig != nil { + c.longtermTSBD.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow + } + + if c.longtermTSBD.RetentionDuration == 0 && c.longtermTSBD.MaxBytes == 0 { + c.longtermTSBD.RetentionDuration = defaultLongtermRetentionDuration + level.Info(logger).Log( + "msg", "No time or size longterm retention was set so using the default time retention", + "duration", defaultLongtermRetentionDuration, + ) + } + + // Check for overflows. This limits our max retention to 100y. + if c.longtermTSBD.RetentionDuration < 0 { + y, err := model.ParseDuration("100y") + if err != nil { + return err + } + + c.longtermTSBD.RetentionDuration = y + level.Warn(logger).Log("msg", "Time longterm retention value is too high. Limiting to: "+y.String()) + } + + // Max block size settings. + if c.longtermTSBD.MaxBlockDuration == 0 { + maxBlockDuration, err := model.ParseDuration("31d") + if err != nil { + return err + } + + // When the time retention is set and not too big use to define the max block duration. + if c.longtermTSBD.RetentionDuration != 0 && c.longtermTSBD.RetentionDuration/10 < maxBlockDuration { + maxBlockDuration = c.longtermTSBD.RetentionDuration / 10 + } + + c.longtermTSBD.MaxBlockDuration = maxBlockDuration + } + + return nil +} + +func (c *flagConfig) longtermActivated() bool { + return c.longtermStoragePath != "" +} + func main() { if os.Getenv("DEBUG") != "" { runtime.SetBlockProfileRate(20) @@ -446,6 +542,63 @@ func main() { serverOnlyFlag(a, "storage.tsdb.samples-per-chunk", "Target number of samples per chunk."). Default("120").Hidden().IntVar(&cfg.tsdb.SamplesPerChunk) + // longterm + serverOnlyFlag(a, "storage.longterm.path", "Base path for metrics longterm-storage."). + StringVar(&cfg.longtermStoragePath) + + serverOnlyFlag( + a, + "storage.longterm.retention.time", + "How long to retain samples in longterm storage. If neither this flag nor \"storage.longterm.retention.time\""+ + "nor \"storage.longterm.retention.size\" is set, the retention time defaults to "+ + defaultLongtermRetentionString+". Units Supported: y, w, d, h, m, s, ms.", + ).SetValue(&cfg.longtermTSBD.RetentionDuration) + + serverOnlyFlag( + a, + "storage.longterm.corrupted-retention-duration", + "How long to retain corrupted blocks in longterm storage. Units Supported: y, w, d, h, m, s, ms.", + ).Default("4d").SetValue(&cfg.longtermTSBD.CorruptedRetentionDuration) + + serverOnlyFlag( + a, + "storage.longterm.retention.size", + "Maximum number of bytes that can be stored for blocks. "+ + "A unit is required, supported units: B, KB, MB, GB, TB, PB, EB. Ex: \"512MB\". "+ + "Based on powers-of-2, so 1KB is 1024B.", + ).BytesVar(&cfg.longtermTSBD.MaxBytes) + + serverOnlyFlag( + a, + "storage.longterm.no-lockfile", + "Do not create lockfile in data directory.", + ).Default("false").BoolVar(&cfg.longtermTSBD.NoLockfile) + + serverOnlyFlag( + a, + "storage.longterm.min-block-duration", + "Minimum duration of a data block before being persisted. For use in testing.", + ).Hidden().Default("2h").SetValue(&cfg.longtermTSBD.MinBlockDuration) + + serverOnlyFlag( + a, + "storage.longterm.max-block-duration", + "Maximum duration compacted blocks may span. For use in testing. (Defaults to 10% of the retention period.)", + ).Hidden().PlaceHolder("").SetValue(&cfg.longtermTSBD.MaxBlockDuration) + + serverOnlyFlag( + a, + "longterm.interval", + "Downsampling interval for longterm storage. Units Supported: y, w, d, h, m, s, ms.", + ).Default("5m").SetValue(&cfg.longtermInterval) + + serverOnlyFlag( + a, + "query.longterm.lookback-delta", + "The maximum lookback duration for retrieving metrics during expression evaluations and federation.", + ).Default("5m").SetValue(&cfg.longtermLookbackDelta) + + // agent agentOnlyFlag(a, "storage.agent.path", "Base path for metrics storage."). Default("data-agent/").StringVar(&cfg.agentStoragePath) @@ -636,6 +789,15 @@ func main() { cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow } + // longterm + if err := cfg.validateForLongterm(cfgFile, logger); err != nil { + level.Error(logger).Log( + "msg", "failed validate config for longterm", + "err", err, + ) + os.Exit(2) + } + // Now that the validity of the config is established, set the config // success metrics accordingly, although the config isn't really loaded // yet. This will happen later (including setting these metrics again), @@ -733,6 +895,13 @@ func main() { reloadBlocksTriggerNotifier := pp_storage.NewTriggerNotifier() cfg.tsdb.ReloadBlocksExternalTrigger = reloadBlocksTriggerNotifier + // longterm + longtermReloadBlocksTriggerNotifier := pp_storage.NewNoopTriggerNotifier() + if cfg.longtermActivated() { + longtermReloadBlocksTriggerNotifier = pp_storage.NewTriggerNotifier() + cfg.longtermTSBD.ReloadBlocksExternalTrigger = longtermReloadBlocksTriggerNotifier + } + dataDir, err := filepath.Abs(localStoragePath) if err != nil { level.Error(logger).Log("msg", "failed to calculate local storage path abs", "err", err) @@ -765,6 +934,7 @@ func main() { removedHeadTriggerNotifier := pp_storage.NewTriggerNotifier() hManagerReadyNotifier := ready.NewNotifiableNotifier() + _ = longtermReloadBlocksTriggerNotifier hManager, err := pp_storage.NewManager( &pp_storage.Options{ Seed: cfgFile.GlobalConfig.ExternalLabels.Hash(), @@ -772,6 +942,7 @@ func main() { CommitInterval: time.Duration(cfg.WalCommitInterval), MaxRetentionPeriod: time.Duration(cfg.tsdb.RetentionDuration), HeadRetentionPeriod: time.Duration(cfg.HeadRetentionTimeout), + LongtermIntervalMs: time.Duration(cfg.longtermInterval).Milliseconds(), KeeperCapacity: 2, DataDir: localStoragePath, MaxSegmentSize: cfg.WalMaxSamplesPerSegment, @@ -825,6 +996,26 @@ func main() { // PP_CHANGES.md: rebuild on cpp end ) + // longterm + var ( + longtermLocalStorage ReadyStorage = noopReadyStorage{} + longtermFanoutStorage storage.Storage = noopReadyStorage{} + ) + if cfg.longtermActivated() { + longtermLocalStorage = &readyStorage{stats: tsdb.NewDBStats()} + longtermFanoutStorage = storage.NewFanout( + logger, + pp_pkg_storage.NewLongtermAdapter( + clock, + hManager.Proxy(), + hManager.MergeOutOfOrderChunks, + time.Duration(cfg.longtermInterval).Milliseconds(), + prometheus.DefaultRegisterer, + ), + longtermLocalStorage, + ) + } + var ( ctxWeb, cancelWeb = context.WithCancel(context.Background()) ctxRule = context.Background() @@ -910,6 +1101,9 @@ func main() { queryEngine *promql.Engine ruleManager *rules.Manager + + // longterm + longtermQueryEngine *promql.Engine ) if cfg.enableAutoGOMAXPROCS { @@ -973,6 +1167,34 @@ func main() { return ruleQueryOffset }, }) + + longtermQueryEngine = queryEngine + if cfg.longtermActivated() { + longtermQueryEngine = promql.NewEngine( + promql.EngineOpts{ + Logger: log.With(logger, "component", "longterm_query_engine"), + Reg: prometheus.WrapRegistererWithPrefix( + "longterm_", + prometheus.DefaultRegisterer, + ), + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + ActiveQueryTracker: promql.NewActiveQueryTracker( + localStoragePath, + cfg.queryConcurrency, + log.With(logger, "component", "longterm_active_query_tracker"), + ), + LookbackDelta: time.Duration(cfg.longtermLookbackDelta), + NoStepSubqueryIntervalFn: noStepSubqueryInterval.Get, + // EnableAtModifier and EnableNegativeOffset have to be + // always on for regular PromQL as of Prometheus v2.33. + EnableAtModifier: true, + EnableNegativeOffset: true, + EnablePerStepStats: cfg.enablePerStepStats, + EnableDelayedNameRemoval: cfg.promqlEnableDelayedNameRemoval, + }, + ) + } } scraper.Set(scrapeManager) @@ -992,6 +1214,10 @@ func main() { cfg.web.IsAgent = agentMode cfg.web.AppName = modeAppName + // longterm + cfg.web.LongtermStorage = longtermFanoutStorage + cfg.web.LongtermQueryEngine = longtermQueryEngine + cfg.web.Version = &web.PrometheusVersion{ Version: version.Version, Revision: version.Revision, @@ -1031,7 +1257,10 @@ func main() { }, { // PP_CHANGES.md: rebuild on cpp end name: "db_storage", reloader: localStorage.ApplyConfig, - }, { // PP_CHANGES.md: rebuild on cpp start + }, { // PP_CHANGES.md: rebuild on cpp + name: "db_longterm_storage", + reloader: longtermLocalStorage.ApplyConfig, + }, { // PP_CHANGES.md: rebuild on cpp name: "remote_read", reloader: remoteRead.ApplyConfig, }, { // PP_CHANGES.md: rebuild on cpp end @@ -1127,7 +1356,10 @@ func main() { // Start all components while we wait for TSDB to open but only load // initial config and mark ourselves as ready after it completed. - dbOpen := make(chan struct{}) + dbOpen := newWaiter() + if cfg.longtermActivated() { + dbOpen.inc() + } // sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded). type closeOnce struct { @@ -1298,11 +1530,11 @@ func main() { for { select { case <-hup: - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) } case rc := <-webHandler.Reload(): - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { level.Error(logger).Log("msg", "Error reloading config", "err", err) rc <- err } else { @@ -1326,14 +1558,14 @@ func main() { g.Add( func() error { select { - case <-dbOpen: + case <-dbOpen.done(): // In case a shutdown is initiated before the dbOpen is released case <-cancel: reloadReady.Close() return nil } - if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { + if err := reloadConfig(cfg, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil { return fmt.Errorf("error loading config from %q: %w", cfg.configFile, err) } @@ -1368,7 +1600,13 @@ func main() { } } - db, err := openDBWithMetrics(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.getStats()) + db, err := openDBWithMetrics( + localStoragePath, + log.With(logger, "component", "tsdb"), + prometheus.DefaultRegisterer, + &opts, + localStorage.getStats(), + ) if err != nil { return fmt.Errorf("opening storage failed: %w", err) } @@ -1396,7 +1634,7 @@ func main() { startTimeMargin := int64(2 * time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) localStorage.Set(db, startTimeMargin) // db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp - close(dbOpen) + dbOpen.close() <-cancel return nil }, @@ -1407,6 +1645,66 @@ func main() { close(cancel) }, ) + + // longterm + if cfg.longtermActivated() { + longtermOpts := cfg.longtermTSBD.ToTSDBOptions() + longtermOpts.StripeSize = 1 // PP_CHANGES.md: rebuild on cpp + longtermCancel := make(chan struct{}) + g.Add( + func() error { + level.Info(logger).Log("msg", "Starting Longterm TSDB ...") + + longtermDB, err := openDBWithMetrics( + cfg.longtermStoragePath, + log.With(logger, "component", "longterm_tsdb"), + prometheus.WrapRegistererWithPrefix("longterm_", prometheus.DefaultRegisterer), + &longtermOpts, + longtermLocalStorage.getStats(), + ) + if err != nil { + return fmt.Errorf("opening longterm storage failed: %w", err) + } + + switch fsType := prom_runtime.Statfs(localStoragePath); fsType { + case "NFS_SUPER_MAGIC": + level.Warn(logger).Log( + "fs_type", fsType, + "msg", "This filesystem is not supported and may lead to data corruption and data loss. "+ + "Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ "+ + "to learn more about supported filesystems.", + ) + default: + level.Info(logger).Log("fs_type", fsType) + } + + level.Info(logger).Log("msg", "Longterm TSDB started") + level.Debug(logger).Log("msg", "Longterm TSDB options", + "MinBlockDuration", cfg.longtermTSBD.MinBlockDuration, + "MaxBlockDuration", cfg.longtermTSBD.MaxBlockDuration, + "MaxBytes", cfg.longtermTSBD.MaxBytes, + "NoLockfile", cfg.longtermTSBD.NoLockfile, + "RetentionDuration", cfg.longtermTSBD.RetentionDuration, + "CorruptedRetentionDuration", cfg.longtermTSBD.CorruptedRetentionDuration, + "WALSegmentSize", cfg.longtermTSBD.WALSegmentSize, + "WALCompression", cfg.longtermTSBD.WALCompression, + ) + + startTimeMargin := int64(2 * time.Duration(cfg.longtermTSBD.MinBlockDuration).Seconds() * 1000) + longtermLocalStorage.Set(longtermDB, startTimeMargin) + dbOpen.close() + <-longtermCancel + + return nil + }, + func(err error) { + if err := longtermFanoutStorage.Close(); err != nil { + level.Error(logger).Log("msg", "Error stopping longterm storage", "err", err) + } + close(longtermCancel) + }, + ) + } } if agentMode { // WAL storage. @@ -1451,7 +1749,7 @@ func main() { localStorage.Set(db, 0) // db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp - close(dbOpen) + dbOpen.close() <-cancel return nil }, @@ -1483,7 +1781,7 @@ func main() { g.Add( func() error { select { - case <-dbOpen: + case <-dbOpen.done(): // In case a shutdown is initiated before the dbOpen is released case <-cancel: return nil @@ -1577,10 +1875,16 @@ func main() { level.Info(logger).Log("msg", "See you next time!") } -func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options, stats *tsdb.DBStats) (*tsdb.DB, error) { +func openDBWithMetrics( + dir string, + logger log.Logger, + reg prometheus.Registerer, + opts *tsdb.Options, + stats *tsdb.DBStats, +) (*tsdb.DB, error) { db, err := tsdb.Open( dir, - log.With(logger, "component", "tsdb"), + logger, reg, opts, stats, @@ -1633,10 +1937,16 @@ type reloader struct { reloader func(*config.Config) error } -func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage bool, logger log.Logger, noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, rls ...reloader) (err error) { +func reloadConfig( + flagCfg flagConfig, + expandExternalLabels, enableExemplarStorage bool, + logger log.Logger, + noStepSuqueryInterval *safePromQLNoStepSubqueryInterval, + rls ...reloader, +) (err error) { start := time.Now() timings := []interface{}{} - level.Info(logger).Log("msg", "Loading configuration file", "filename", filename) + level.Info(logger).Log("msg", "Loading configuration file", "filename", flagCfg.configFile) defer func() { if err == nil { @@ -1647,9 +1957,18 @@ func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage b } }() - conf, err := config.LoadFile(filename, agentMode, expandExternalLabels, logger) + conf, err := config.LoadFile(flagCfg.configFile, agentMode, expandExternalLabels, logger) if err != nil { - return fmt.Errorf("couldn't load configuration (--config.file=%q): %w", filename, err) + return fmt.Errorf("couldn't load configuration (--config.file=%q): %w", flagCfg.configFile, err) + } + + // longterm + if flagCfg.longtermActivated() && flagCfg.longtermInterval <= conf.GlobalConfig.ScrapeInterval { + return fmt.Errorf( + "the scratch interval(%s) should be less than the longterm interval(%s)", + conf.GlobalConfig.ScrapeInterval, + flagCfg.longtermInterval, + ) } if enableExemplarStorage { @@ -1668,7 +1987,10 @@ func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage b timings = append(timings, rl.name, time.Since(rstart)) } if failed { - return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename) + return fmt.Errorf( + "one or more errors occurred while applying the new configuration (--config.file=%q)", + flagCfg.configFile, + ) } oldGoGC := debug.SetGCPercent(conf.Runtime.GoGC) @@ -1683,7 +2005,11 @@ func reloadConfig(filename string, expandExternalLabels, enableExemplarStorage b } noStepSuqueryInterval.Set(conf.GlobalConfig.EvaluationInterval) - l := []interface{}{"msg", "Completed loading of configuration file", "filename", filename, "totalDuration", time.Since(start)} + l := []any{ + "msg", "Completed loading of configuration file", + "filename", flagCfg.configFile, + "totalDuration", time.Since(start), + } level.Info(logger).Log(append(l, timings...)...) return nil } @@ -2189,3 +2515,191 @@ func readPromPPFeatures(logger log.Logger) { } } } + +// +// waiter +// + +type waiter struct { + ch chan struct{} + waiters int + locker sync.Mutex +} + +func newWaiter() *waiter { + return &waiter{ + ch: make(chan struct{}), + waiters: 1, + locker: sync.Mutex{}, + } +} + +func (w *waiter) inc() { + w.locker.Lock() + w.waiters++ + w.locker.Unlock() +} + +func (w *waiter) close() { + w.locker.Lock() + w.waiters-- + if w.waiters == 0 { + close(w.ch) + } + w.locker.Unlock() +} + +func (w *waiter) done() <-chan struct{} { + return w.ch +} + +// +// ReadyStorage +// + +// ReadyStorage implements the [storage.Storage] and TSDBAdminStats interface. +type ReadyStorage interface { + Appender(ctx context.Context) storage.Appender + ApplyConfig(conf *config.Config) error + ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) + CleanTombstones() error + Close() error + Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error + ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) + Querier(mint, maxt int64) (storage.Querier, error) + Set(db storage.Storage, startTimeMargin int64) + Snapshot(dir string, withHead bool) error + StartTime() (int64, error) + Stats(statsByLabelName string, limit int) (*tsdb.Stats, error) + WALReplayStatus() (tsdb.WALReplayStatus, error) + getStats() *tsdb.DBStats +} + +// +// noopReadyStorage +// + +// errlongtermNotActivated error when longterm is not activated. +var errlongtermNotActivated = errors.New("longterm is not activated") + +// noopReadyStorage not operations implements the [ReadyStorage] interface. +type noopReadyStorage struct{} + +// Appender implements the Storage interface. +func (noopReadyStorage) Appender(context.Context) storage.Appender { + return longtermAppender{} +} + +// Appender implements the [ReadyStorage] interface. +func (noopReadyStorage) ApplyConfig(*config.Config) error { + return nil +} + +// ChunkQuerier implements the Storage interface. +func (noopReadyStorage) ChunkQuerier(int64, int64) (storage.ChunkQuerier, error) { + return nil, errlongtermNotActivated +} + +// CleanTombstones implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (noopReadyStorage) CleanTombstones() error { + return nil +} + +// Close implements the Storage interface. +func (noopReadyStorage) Close() error { + return nil +} + +// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (noopReadyStorage) Delete(context.Context, int64, int64, ...*labels.Matcher) error { + return nil +} + +// ExemplarQuerier implements the Storage interface. +func (noopReadyStorage) ExemplarQuerier(context.Context) (storage.ExemplarQuerier, error) { + return nil, errlongtermNotActivated +} + +// Querier implements the Storage interface. +func (noopReadyStorage) Querier(int64, int64) (storage.Querier, error) { + return nil, errlongtermNotActivated +} + +// Set implements the [ReadyStorage] interface. +func (noopReadyStorage) Set(storage.Storage, int64) {} + +// Snapshot implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. +func (noopReadyStorage) Snapshot(dir string, withHead bool) error { + return errlongtermNotActivated +} + +// StartTime implements the Storage interface. +func (noopReadyStorage) StartTime() (int64, error) { + return math.MaxInt64, nil +} + +// Stats implements the api_v1.TSDBAdminStats interface. +func (noopReadyStorage) Stats(string, int) (*tsdb.Stats, error) { + return nil, errlongtermNotActivated +} + +// WALReplayStatus implements the api_v1.TSDBStats interface. +func (noopReadyStorage) WALReplayStatus() (tsdb.WALReplayStatus, error) { + return tsdb.WALReplayStatus{}, errlongtermNotActivated +} + +// GetStats implements the [ReadyStorage] interface. +func (noopReadyStorage) getStats() *tsdb.DBStats { + return tsdb.NewDBStats() +} + +// +// longtermAppender +// + +// errCannotAppend when cannot append data to this appender. +var errCannotAppend = errors.New("cannot append data to this appender") + +// longtermAppender the appender that data cannot be added to, implements the [storage.Appender] interface. +type longtermAppender struct{} + +// Append implements the [storage.Appender] interface. +func (longtermAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) { + return 0, errCannotAppend +} + +// AppendExemplar implements the [storage.Appender] interface. +func (longtermAppender) AppendExemplar( + storage.SeriesRef, + labels.Labels, + exemplar.Exemplar, +) (storage.SeriesRef, error) { + return 0, errCannotAppend +} + +// AppendHistogram implements the [storage.Appender] interface. +func (longtermAppender) AppendHistogram( + storage.SeriesRef, + labels.Labels, + int64, + *histogram.Histogram, + *histogram.FloatHistogram, +) (storage.SeriesRef, error) { + return 0, errCannotAppend +} + +// UpdateMetadata implements the [storage.Appender] interface. +func (longtermAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) { + return 0, errCannotAppend +} + +// AppendCTZeroSample implements the [storage.Appender] interface. +func (longtermAppender) AppendCTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) { + return 0, errCannotAppend +} + +// Commit implements the [storage.Appender] interface. +func (longtermAppender) Commit() error { return errCannotAppend } + +// Rollback implements the [storage.Appender] interface. +func (longtermAppender) Rollback() error { return errCannotAppend } diff --git a/pp-pkg/storage/adapter.go b/pp-pkg/storage/adapter.go index b4f85ec32..a1ea562ae 100644 --- a/pp-pkg/storage/adapter.go +++ b/pp-pkg/storage/adapter.go @@ -33,6 +33,7 @@ type Adapter struct { hashdexLimits cppbridge.WALHashdexLimits transparentState *cppbridge.StateV2 mergeOutOfOrderChunks func() + longtermIntervalMs int64 // stat activeQuerierMetrics *querier.Metrics @@ -41,12 +42,51 @@ type Adapter struct { samplesAppended prometheus.Counter } -// NewAdapter init new [Adapter]. +// NewAdapter init new main [Adapter]. func NewAdapter( clock clockwork.Clock, proxy *pp_storage.Proxy, mergeOutOfOrderChunks func(), registerer prometheus.Registerer, +) *Adapter { + return newAdapter( + clock, + proxy, + mergeOutOfOrderChunks, + 0, + querier.QueryableAppenderSource, + querier.QueryableStorageSource, + registerer, + ) +} + +// NewLongtermAdapter init new longterm [Adapter]. +func NewLongtermAdapter( + clock clockwork.Clock, + proxy *pp_storage.Proxy, + mergeOutOfOrderChunks func(), + longtermIntervalMs int64, + registerer prometheus.Registerer, +) *Adapter { + return newAdapter( + clock, + proxy, + mergeOutOfOrderChunks, + longtermIntervalMs, + querier.QueryableLongtermAppenderSource, + querier.QueryableLongtermStorageSource, + registerer, + ) +} + +// newAdapter init new [Adapter]. +func newAdapter( + clock clockwork.Clock, + proxy *pp_storage.Proxy, + mergeOutOfOrderChunks func(), + longtermIntervalMs int64, + activeSource, storageSource string, + registerer prometheus.Registerer, ) *Adapter { factory := util.NewUnconflictRegisterer(registerer) return &Adapter{ @@ -56,8 +96,9 @@ func NewAdapter( hashdexLimits: cppbridge.DefaultWALHashdexLimits(), transparentState: cppbridge.NewTransitionStateV2(), mergeOutOfOrderChunks: mergeOutOfOrderChunks, - activeQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableAppenderSource), - storageQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableStorageSource), + longtermIntervalMs: longtermIntervalMs, + activeQuerierMetrics: querier.NewMetrics(registerer, activeSource), + storageQuerierMetrics: querier.NewMetrics(registerer, storageSource), appendDuration: factory.NewHistogram( prometheus.HistogramOpts{ Name: "prompp_adapter_append_duration", @@ -219,7 +260,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) ahead := ar.proxy.Get() queriers = append( queriers, - querier.NewChunkQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil), + querier.NewChunkQuerier( + ahead, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + ar.longtermIntervalMs, + nil, + ), ) for _, head := range ar.proxy.Heads() { @@ -229,7 +277,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) queriers = append( queriers, - querier.NewChunkQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil), + querier.NewChunkQuerier( + head, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + ar.longtermIntervalMs, + nil, + ), ) } @@ -254,6 +309,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) { querier.NewNoOpShardedDeduplicator, mint, maxt, + ar.longtermIntervalMs, nil, ar.activeQuerierMetrics, ), nil @@ -281,7 +337,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) { ahead := ar.proxy.Get() queriers = append( queriers, - querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics), + querier.NewQuerier( + ahead, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + ar.longtermIntervalMs, + nil, + ar.activeQuerierMetrics, + ), ) for _, head := range ar.proxy.Heads() { @@ -291,7 +355,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) { queriers = append( queriers, - querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics), + querier.NewQuerier( + head, + querier.NewNoOpShardedDeduplicator, + mint, + maxt, + ar.longtermIntervalMs, + nil, + ar.storageQuerierMetrics, + ), ) } diff --git a/pp-pkg/tsdb/compactor.go b/pp-pkg/tsdb/compactor.go index 15a41da44..82ea36058 100644 --- a/pp-pkg/tsdb/compactor.go +++ b/pp-pkg/tsdb/compactor.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" diff --git a/pp/entrypoint/go_constants.h b/pp/entrypoint/go_constants.h index ec74157b1..dd239f1b5 100644 --- a/pp/entrypoint/go_constants.h +++ b/pp/entrypoint/go_constants.h @@ -4,4 +4,4 @@ #define Sizeof_RoaringBitset 40 #define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset) -#define Sizeof_SerializedDataIterator 192 +#define Sizeof_SerializedDataIterator 200 diff --git a/pp/entrypoint/head/serialization.h b/pp/entrypoint/head/serialization.h index 3168a8ff0..fec3769a0 100644 --- a/pp/entrypoint/head/serialization.h +++ b/pp/entrypoint/head/serialization.h @@ -1,27 +1,35 @@ #pragma once +#include "series_data/decoder/decorator/downsampling_decode_iterator.h" #include "series_data/serialization/serialized_data.h" namespace entrypoint::head { +using DecodeIterator = series_data::decoder::decorator::DownsamplingDecodeIterator; +using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator; + class SerializedDataGo { public: - explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks) - : data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)} {} + explicit SerializedDataGo(const series_data::DataStorage& storage, + const series_data::querier::QueriedChunkList& queried_chunks, + PromPP::Primitives::Timestamp downsampling_ms) + : data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)}, downsampling_ms_(downsampling_ms) {} [[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer_view() const noexcept { return data_view_.get_buffer_view(); } [[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks_view() const noexcept { return data_view_.get_chunks_view(); } [[nodiscard]] PROMPP_ALWAYS_INLINE auto next() noexcept { return data_view_.next_series(); } - [[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator(uint32_t chunk_id) const noexcept { return data_view_.create_series_iterator(chunk_id); } + [[nodiscard]] PROMPP_ALWAYS_INLINE SerializedDataIterator iterator(uint32_t chunk_id) const noexcept { + return data_view_.create_series_iterator(chunk_id, DecodeIterator(downsampling_ms_)); + } private: series_data::serialization::SerializedData data_; series_data::serialization::SerializedDataView data_view_{data_}; + PromPP::Primitives::Timestamp downsampling_ms_{}; }; using SerializedDataPtr = std::unique_ptr; -using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator; static_assert(sizeof(SerializedDataPtr) == sizeof(void*)); diff --git a/pp/entrypoint/series_data/querier.h b/pp/entrypoint/series_data/querier.h index f67ca505e..72fe35467 100644 --- a/pp/entrypoint/series_data/querier.h +++ b/pp/entrypoint/series_data/querier.h @@ -98,8 +98,11 @@ class RangeQuerierWithArgumentsWrapperV2 { using BytesStream = PromPP::Primitives::Go::BytesStream; public: - RangeQuerierWithArgumentsWrapperV2(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data) - : querier_(storage), query_(&query), serialized_data_(serialized_data) {} + RangeQuerierWithArgumentsWrapperV2(DataStorage& storage, + const Query& query, + head::SerializedDataPtr* serialized_data, + PromPP::Primitives::Timestamp downsampling_ms) + : querier_(storage), query_(&query), serialized_data_(serialized_data), downsampling_ms_(downsampling_ms) {} void query() noexcept { querier_.query(*query_); @@ -118,9 +121,10 @@ class RangeQuerierWithArgumentsWrapperV2 { ::series_data::querier::Querier querier_; const Query* query_; head::SerializedDataPtr* serialized_data_; + PromPP::Primitives::Timestamp downsampling_ms_; PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept { - std::construct_at(serialized_data_, std::make_unique(querier_.get_storage(), querier_.chunks())); + std::construct_at(serialized_data_, std::make_unique(querier_.get_storage(), querier_.chunks(), downsampling_ms_)); } }; diff --git a/pp/entrypoint/series_data_data_storage.cpp b/pp/entrypoint/series_data_data_storage.cpp index 0cb19dd77..29fec462e 100644 --- a/pp/entrypoint/series_data_data_storage.cpp +++ b/pp/entrypoint/series_data_data_storage.cpp @@ -151,6 +151,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res) struct Arguments { DataStoragePtr data_storage; Query query; + PromPP::Primitives::Timestamp downsampling_ms; }; struct Result { @@ -162,7 +163,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res) const auto in = static_cast(args); const auto out = static_cast(res); - RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data); + RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data, in->downsampling_ms); querier.query(); if (querier.need_loading()) { diff --git a/pp/entrypoint/series_data_data_storage.h b/pp/entrypoint/series_data_data_storage.h index fcc4474de..a436e003c 100644 --- a/pp/entrypoint/series_data_data_storage.h +++ b/pp/entrypoint/series_data_data_storage.h @@ -114,12 +114,13 @@ void prompp_series_data_data_storage_query(void* args, void* res); * @param args { * dataStorage uintptr // pointer to constructed data storage * query DataStorageQuery // query + * downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled) * } * * @param res { - * Querier uintptr // pointer to constructed Querier if data loading is needed. + * querier uintptr // pointer to constructed Querier if data loading is needed. * // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final. - * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) + * status uint8 // status of a query (0 - Success, 1 - Data loading is needed) * serializedData uintptr // pointer to serialized data * } */ diff --git a/pp/go/cppbridge/entrypoint.go b/pp/go/cppbridge/entrypoint.go index 939c09e4e..d3c086aa1 100644 --- a/pp/go/cppbridge/entrypoint.go +++ b/pp/go/cppbridge/entrypoint.go @@ -1856,11 +1856,12 @@ type DataStorageQueryResult struct { SerializedData *DataStorageSerializedData } -func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData) (querier uintptr, status uint8) { +func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData, downsamplingMs int64) (querier uintptr, status uint8) { args := struct { - dataStorage uintptr - query HeadDataStorageQuery - }{dataStorage, query} + dataStorage uintptr + query HeadDataStorageQuery + downsamplingMs int64 + }{dataStorage, query, downsamplingMs} res := struct { Querier uintptr diff --git a/pp/go/cppbridge/entrypoint.h b/pp/go/cppbridge/entrypoint.h index 2d1b7061a..cca4e2a14 100755 --- a/pp/go/cppbridge/entrypoint.h +++ b/pp/go/cppbridge/entrypoint.h @@ -39,7 +39,7 @@ void prompp_dump_memory_profile(void* args, void* res); #define Sizeof_RoaringBitset 40 #define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset) -#define Sizeof_SerializedDataIterator 192 +#define Sizeof_SerializedDataIterator 200 #ifdef __cplusplus extern "C" { #endif @@ -1349,12 +1349,13 @@ void prompp_series_data_data_storage_query(void* args, void* res); * @param args { * dataStorage uintptr // pointer to constructed data storage * query DataStorageQuery // query + * downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled) * } * * @param res { - * Querier uintptr // pointer to constructed Querier if data loading is needed. + * querier uintptr // pointer to constructed Querier if data loading is needed. * // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final. - * Status uint8 // status of a query (0 - Success, 1 - Data loading is needed) + * status uint8 // status of a query (0 - Success, 1 - Data loading is needed) * serializedData uintptr // pointer to serialized data * } */ diff --git a/pp/go/cppbridge/head.go b/pp/go/cppbridge/head.go index ae8d67c2e..5cc6848fc 100644 --- a/pp/go/cppbridge/head.go +++ b/pp/go/cppbridge/head.go @@ -10,6 +10,8 @@ const ( NormalNaN uint64 = 0x7ff8000000000001 StaleNaN uint64 = 0x7ff0000000000002 + + NoDownsampling = 0 ) func IsStaleNaN(v float64) bool { @@ -314,9 +316,9 @@ func (i HeadDataStorageSerializedChunkIndex) Chunks(r *HeadDataStorageSerialized return res } -func (ds *HeadDataStorage) Query(query HeadDataStorageQuery) DataStorageQueryResult { +func (ds *HeadDataStorage) Query(query HeadDataStorageQuery, downsamplingMs int64) DataStorageQueryResult { sd := NewDataStorageSerializedData() - querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd) + querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd, downsamplingMs) return DataStorageQueryResult{ Querier: querier, Status: status, @@ -356,10 +358,9 @@ func (sd *DataStorageSerializedData) Next() (uint32, uint32) { } type DataStorageSerializedDataIteratorControlBlock struct { - decoderVariant uint64 - Timestamp int64 - Value float64 - remainingSamples uint8 + decodedTimestamp int64 + timestamp int64 + value float64 } type DataStorageSerializedDataIterator struct { @@ -386,7 +387,15 @@ func (it *DataStorageSerializedDataIterator) Reset(serializedData *DataStorageSe } func (it *DataStorageSerializedDataIterator) HasData() bool { - return it.remainingSamples != 0 + return it.decodedTimestamp != math.MinInt64 +} + +func (it *DataStorageSerializedDataIterator) Timestamp() int64 { + return it.timestamp +} + +func (it *DataStorageSerializedDataIterator) Value() float64 { + return it.value } // UnloadedDataLoader is Go wrapper around series_data::Loader. diff --git a/pp/go/cppbridge/head_test.go b/pp/go/cppbridge/head_test.go index a9cd2942c..3fb96324e 100644 --- a/pp/go/cppbridge/head_test.go +++ b/pp/go/cppbridge/head_test.go @@ -121,8 +121,8 @@ func (s *HeadSuite) TestSerializedChunkRecoder() { result := s.dataStorage.Query(cppbridge.HeadDataStorageQuery{ StartTimestampMs: timeInterval.MinT, EndTimestampMs: timeInterval.MaxT, - LabelSetIDs: []uint32{0, 1}}, - ) + LabelSetIDs: []uint32{0, 1}, + }, cppbridge.NoDownsampling) recoder := cppbridge.NewSerializedChunkRecoder(result.SerializedData, timeInterval) // Act @@ -179,7 +179,7 @@ func (s *HeadSuite) TestInstantQuery() { // Arrange dataStorage := cppbridge.NewHeadDataStorage() encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage) - var series = []struct { + series := []struct { SeriesID uint32 cppbridge.Sample }{ diff --git a/pp/go/storage/appender/appender_test.go b/pp/go/storage/appender/appender_test.go index 1a9b0607a..ee3bdf297 100644 --- a/pp/go/storage/appender/appender_test.go +++ b/pp/go/storage/appender/appender_test.go @@ -123,7 +123,7 @@ func (s *AppenderSuite) getHeadData(labelSetIDs []uint32) headStorageData { StartTimestampMs: 0, EndTimestampMs: math.MaxInt64, LabelSetIDs: labelSetIDs, - }) + }, cppbridge.NoDownsampling) data.dsResult = append(data.dsResult, dsResult) data.shards = append(data.shards, storageData{ diff --git a/pp/go/storage/head/services/persistener.go b/pp/go/storage/head/services/persistener.go index 8218af077..92f0c0659 100644 --- a/pp/go/storage/head/services/persistener.go +++ b/pp/go/storage/head/services/persistener.go @@ -35,6 +35,7 @@ type Persistener[ clock clockwork.Clock tsdbRetentionPeriod time.Duration retentionPeriod time.Duration + longtermIntervalMs int64 // stat events prometheus.Counter headPersistenceDuration prometheus.Histogram @@ -53,6 +54,7 @@ func NewPersistener[ clock clockwork.Clock, tsdbRetentionPeriod time.Duration, retentionPeriod time.Duration, + longtermIntervalMs int64, registerer prometheus.Registerer, ) *Persistener[TTask, TShard, TGoShard, THeadBlockWriter, THead] { factory := util.NewUnconflictRegisterer(registerer) @@ -63,6 +65,7 @@ func NewPersistener[ clock: clock, tsdbRetentionPeriod: tsdbRetentionPeriod, retentionPeriod: retentionPeriod, + longtermIntervalMs: longtermIntervalMs, events: factory.NewCounter( prometheus.CounterOpts{ Name: "prompp_head_event_count", @@ -230,6 +233,7 @@ func NewPersistenerService[ mediator Mediator, tsdbRetentionPeriod time.Duration, retentionPeriod time.Duration, + longtermIntervalMs int64, registerer prometheus.Registerer, ) *PersistenerService[TTask, TShard, TGoShard, THeadBlockWriter, THead, TProxyHead, TLoader] { return &PersistenerService[TTask, TShard, TGoShard, THeadBlockWriter, THead, TProxyHead, TLoader]{ @@ -240,6 +244,7 @@ func NewPersistenerService[ clock, tsdbRetentionPeriod, retentionPeriod, + longtermIntervalMs, registerer, ), proxy: proxy, diff --git a/pp/go/storage/head/services/persistener_test.go b/pp/go/storage/head/services/persistener_test.go index a1fc2b1d1..7ae2a43ec 100644 --- a/pp/go/storage/head/services/persistener_test.go +++ b/pp/go/storage/head/services/persistener_test.go @@ -114,7 +114,7 @@ func (s *PersistenerSuite) SetupTest() { *shard.PerGoroutineShard, *mock.HeadBlockWriterMock[*shard.Shard], *storage.Head, - ](s.catalog, s.blockWriter, s.writeNotifier, s.clock, tsdbRetentionPeriod, retentionPeriod, nil) + ](s.catalog, s.blockWriter, s.writeNotifier, s.clock, tsdbRetentionPeriod, retentionPeriod, cppbridge.NoDownsampling, nil) } func TestPersistenerSuite(t *testing.T) { @@ -329,6 +329,7 @@ func (s *PersistenerServiceSuite) SetupTest() { nil, tsdbRetentionPeriod, retentionPeriod, + 0, nil, ) } diff --git a/pp/go/storage/head/shard/data_storage.go b/pp/go/storage/head/shard/data_storage.go index 0cb18e2ca..aa955cba4 100644 --- a/pp/go/storage/head/shard/data_storage.go +++ b/pp/go/storage/head/shard/data_storage.go @@ -73,9 +73,9 @@ func (ds *DataStorage) MergeOutOfOrderChunks() { ds.locker.Unlock() } -func (ds *DataStorage) Query(query cppbridge.HeadDataStorageQuery) cppbridge.DataStorageQueryResult { +func (ds *DataStorage) Query(query cppbridge.HeadDataStorageQuery, downsamplingMs int64) cppbridge.DataStorageQueryResult { ds.locker.RLock() - result := ds.dataStorage.Query(query) + result := ds.dataStorage.Query(query, downsamplingMs) ds.locker.RUnlock() return result } diff --git a/pp/go/storage/loader_test.go b/pp/go/storage/loader_test.go index b8819400d..9cf4019a4 100644 --- a/pp/go/storage/loader_test.go +++ b/pp/go/storage/loader_test.go @@ -211,7 +211,7 @@ func (s *HeadLoadSuite) TestLoadWithDisabledDataUnloading() { StartTimestampMs: 0, EndTimestampMs: 2, LabelSetIDs: []uint32{0}, - }) + }, cppbridge.NoDownsampling) err := loadedHead.Close() // Assert @@ -263,7 +263,7 @@ func (s *HeadLoadSuite) TestAppendAfterLoad() { StartTimestampMs: 0, EndTimestampMs: 4, LabelSetIDs: []uint32{0}, - }) + }, cppbridge.NoDownsampling) err := loadedHead.Close() diff --git a/pp/go/storage/manager.go b/pp/go/storage/manager.go index 68152197a..2842c8899 100644 --- a/pp/go/storage/manager.go +++ b/pp/go/storage/manager.go @@ -73,6 +73,7 @@ type Options struct { CommitInterval time.Duration MaxRetentionPeriod time.Duration HeadRetentionPeriod time.Duration + LongtermIntervalMs int64 KeeperCapacity int DataDir string MaxSegmentSize uint32 @@ -294,6 +295,7 @@ func (m *Manager) initServices( persistenerMediator, o.MaxRetentionPeriod, o.HeadRetentionPeriod, + o.LongtermIntervalMs, r, ).Execute() @@ -454,6 +456,11 @@ func NewTriggerNotifier() *TriggerNotifier { return &TriggerNotifier{c: make(chan struct{}, 1)} } +// NewTriggerNotifier init new noop [TriggerNotifier]. +func NewNoopTriggerNotifier() *TriggerNotifier { + return &TriggerNotifier{} +} + // Chan returns channel with notifications. func (tn *TriggerNotifier) Chan() <-chan struct{} { return tn.c diff --git a/pp/go/storage/manager_test.go b/pp/go/storage/manager_test.go index de944b3fe..55d6f3281 100644 --- a/pp/go/storage/manager_test.go +++ b/pp/go/storage/manager_test.go @@ -2,8 +2,16 @@ package storage_test import ( "fmt" + "os" + "path/filepath" + "testing" + "time" + "github.com/jonboulle/clockwork" "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/prometheus/prometheus/pp/go/cppbridge" "github.com/prometheus/prometheus/pp/go/storage" "github.com/prometheus/prometheus/pp/go/storage/block" @@ -11,19 +19,11 @@ import ( "github.com/prometheus/prometheus/pp/go/storage/head/shard/wal" "github.com/prometheus/prometheus/pp/go/storage/head/shard/wal/writer" "github.com/prometheus/prometheus/pp/go/storage/storagetest" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "os" - "path/filepath" - "testing" - "time" ) -var ( - defaultSortCatalogRecordsFunc = func(lhs, rhs *catalog.Record) bool { - return lhs.CreatedAt() < rhs.CreatedAt() - } -) +var defaultSortCatalogRecordsFunc = func(lhs, rhs *catalog.Record) bool { + return lhs.CreatedAt() < rhs.CreatedAt() +} type UploadOrBuildHeadSuite struct { suite.Suite @@ -117,7 +117,7 @@ func (s *UploadOrBuildHeadSuite) TestUploadOrBuildHeadCorrupted() { func (s *UploadOrBuildHeadSuite) fixWalEncoderVersion(headDir string, numberOfShards uint16, encoderVersion uint8) { for i := uint16(0); i < numberOfShards; i++ { - file, err := os.OpenFile(filepath.Join(headDir, fmt.Sprintf("shard_%d.wal", i)), os.O_RDWR|os.O_TRUNC, 0666) + file, err := os.OpenFile(filepath.Join(headDir, fmt.Sprintf("shard_%d.wal", i)), os.O_RDWR|os.O_TRUNC, 0o666) require.NoError(s.T(), err) _, err = writer.WriteHeader(file, wal.FileFormatVersion, encoderVersion) require.NoError(s.T(), err) diff --git a/pp/go/storage/querier/chunk_querier.go b/pp/go/storage/querier/chunk_querier.go index 7e637fd0f..6763de72b 100644 --- a/pp/go/storage/querier/chunk_querier.go +++ b/pp/go/storage/querier/chunk_querier.go @@ -33,11 +33,12 @@ type ChunkQuerier[ TShard Shard[TDataStorage, TLSS], THead Head[TTask, TDataStorage, TLSS, TShard], ] struct { - head THead - deduplicatorCtor deduplicatorCtor - mint int64 - maxt int64 - closer func() error + head THead + deduplicatorCtor deduplicatorCtor + mint int64 + maxt int64 + longtermIntervalMs int64 + closer func() error } // NewChunkQuerier init new [ChunkQuerier]. @@ -50,15 +51,16 @@ func NewChunkQuerier[ ]( head THead, deduplicatorCtor deduplicatorCtor, - mint, maxt int64, + mint, maxt, longtermIntervalMs int64, closer func() error, ) *ChunkQuerier[TTask, TDataStorage, TLSS, TShard, THead] { return &ChunkQuerier[TTask, TDataStorage, TLSS, TShard, THead]{ - head: head, - deduplicatorCtor: deduplicatorCtor, - mint: mint, - maxt: maxt, - closer: closer, + head: head, + deduplicatorCtor: deduplicatorCtor, + mint: mint, + maxt: maxt, + longtermIntervalMs: longtermIntervalMs, + closer: closer, } } @@ -143,7 +145,14 @@ func (q *ChunkQuerier[TTask, TDataStorage, TLSS, TShard, THead]) Select( return storage.ErrChunkSeriesSet(err) } - shardedSerializedData := queryDataStorage(dsQueryChunkQuerier, q.head, lssQueryResults, q.mint, q.maxt) + shardedSerializedData := queryDataStorage( + dsQueryChunkQuerier, + q.head, + lssQueryResults, + q.mint, + q.maxt, + q.longtermIntervalMs, + ) chunkSeriesSets := make([]storage.ChunkSeriesSet, q.head.NumberOfShards()) for shardID, serializedData := range shardedSerializedData { if serializedData == nil { diff --git a/pp/go/storage/querier/interface.go b/pp/go/storage/querier/interface.go index d51fe2ed1..69e34117e 100644 --- a/pp/go/storage/querier/interface.go +++ b/pp/go/storage/querier/interface.go @@ -47,9 +47,7 @@ type DataStorage interface { ) ([]cppbridge.Sample, cppbridge.DataStorageQueryResult) // Query returns serialized chunks from data storage. - Query( - query cppbridge.HeadDataStorageQuery, - ) cppbridge.DataStorageQueryResult + Query(query cppbridge.HeadDataStorageQuery, downsamplingMs int64) cppbridge.DataStorageQueryResult // WithRLock calls fn on raw [cppbridge.HeadDataStorage] with read lock. WithRLock(fn func(ds *cppbridge.HeadDataStorage) error) error diff --git a/pp/go/storage/querier/metrics.go b/pp/go/storage/querier/metrics.go index 71e968890..7bc9a1e9a 100644 --- a/pp/go/storage/querier/metrics.go +++ b/pp/go/storage/querier/metrics.go @@ -8,8 +8,15 @@ import ( const ( // QueryableAppenderSource metrics source for Appender. QueryableAppenderSource = "queryable_appender" + // QueryableStorageSource metrics source for Storage. QueryableStorageSource = "queryable_storage" + + // QueryableLongtermAppenderSource metrics source for longterm Appender. + QueryableLongtermAppenderSource = "queryable_longterm_appender" + + // QueryableLongtermStorageSource metrics source for longterm Storage. + QueryableLongtermStorageSource = "queryable_longterm_storage" ) // Metrics for [Querier]. diff --git a/pp/go/storage/querier/querier.go b/pp/go/storage/querier/querier.go index cacbd8094..9fc9f0125 100644 --- a/pp/go/storage/querier/querier.go +++ b/pp/go/storage/querier/querier.go @@ -49,12 +49,13 @@ type Querier[ TShard Shard[TDataStorage, TLSS], THead Head[TTask, TDataStorage, TLSS, TShard], ] struct { - mint int64 - maxt int64 - head THead - deduplicatorCtor deduplicatorCtor - closer func() error - metrics *Metrics + mint int64 + maxt int64 + longtermIntervalMs int64 + head THead + deduplicatorCtor deduplicatorCtor + closer func() error + metrics *Metrics } // NewQuerier init new [Querier]. @@ -67,17 +68,18 @@ func NewQuerier[ ]( head THead, deduplicatorCtor deduplicatorCtor, - mint, maxt int64, + mint, maxt, longtermIntervalMs int64, closer func() error, metrics *Metrics, ) *Querier[TTask, TDataStorage, TLSS, TShard, THead] { return &Querier[TTask, TDataStorage, TLSS, TShard, THead]{ - mint: mint, - maxt: maxt, - head: head, - deduplicatorCtor: deduplicatorCtor, - closer: closer, - metrics: metrics, + mint: mint, + maxt: maxt, + longtermIntervalMs: longtermIntervalMs, + head: head, + deduplicatorCtor: deduplicatorCtor, + closer: closer, + metrics: metrics, } } @@ -263,7 +265,14 @@ func (q *Querier[TTask, TDataStorage, TLSS, TShard, THead]) selectRange( return storage.ErrSeriesSet(err) } - shardedSerializedData := queryDataStorage(dsQueryRangeQuerier, q.head, lssQueryResults, q.mint, q.maxt) + shardedSerializedData := queryDataStorage( + dsQueryRangeQuerier, + q.head, + lssQueryResults, + q.mint, + q.maxt, + q.longtermIntervalMs, + ) seriesSets := make([]storage.SeriesSet, q.head.NumberOfShards()) for shardID, serializedData := range shardedSerializedData { if serializedData != nil { @@ -301,7 +310,7 @@ func queryDataStorage[ taskName string, head THead, lssQueryResults []*cppbridge.LSSQueryResult, - mint, maxt int64, + mint, maxt, longtermIntervalMs int64, ) []*cppbridge.DataStorageSerializedData { shardedSerializedData := make([]*cppbridge.DataStorageSerializedData, head.NumberOfShards()) loadAndQueryWaiter := NewLoadAndQueryWaiter[TTask, TDataStorage, TLSS, TShard, THead](head) @@ -315,11 +324,14 @@ func queryDataStorage[ } var result cppbridge.DataStorageQueryResult - result = s.DataStorage().Query(cppbridge.HeadDataStorageQuery{ - StartTimestampMs: mint, - EndTimestampMs: maxt, - LabelSetIDs: lssQueryResult.IDs(), - }) + result = s.DataStorage().Query( + cppbridge.HeadDataStorageQuery{ + StartTimestampMs: mint, + EndTimestampMs: maxt, + LabelSetIDs: lssQueryResult.IDs(), + }, + longtermIntervalMs, + ) if result.Status == cppbridge.DataStorageQueryStatusNeedDataLoad { loadAndQueryWaiter.Add(s, result.Querier) } diff --git a/pp/go/storage/querier/querier_test.go b/pp/go/storage/querier/querier_test.go index b45da5872..4f235399e 100644 --- a/pp/go/storage/querier/querier_test.go +++ b/pp/go/storage/querier/querier_test.go @@ -114,7 +114,7 @@ func (s *QuerierSuite) TestRangeQuery() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -142,7 +142,7 @@ func (s *QuerierSuite) TestRangeQueryWithoutMatching() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "unknown_metric") @@ -195,7 +195,7 @@ func (s *QuerierSuite) TestRangeQueryWithDataStorageLoading() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 3, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 3, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -233,7 +233,7 @@ func (s *QuerierSuite) TestInstantQuery() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -286,7 +286,7 @@ func (s *QuerierSuite) TestInstantQueryWithDataStorageLoading() { *shard.LSS, *shard.PerGoroutineShard, *storage.Head, - ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, nil, nil) + ](s.head, querier.NewNoOpShardedDeduplicator, 0, 0, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, _ := labels.NewMatcher(labels.MatchEqual, "__name__", "metric") @@ -330,7 +330,7 @@ func (s *QuerierSuite) TestLabelNames() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric0") s.Require().NoError(err) @@ -363,7 +363,7 @@ func (s *QuerierSuite) TestLabelNamesWithLimit() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric0") s.Require().NoError(err) @@ -396,7 +396,7 @@ func (s *QuerierSuite) TestLabelNamesNoMatches() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric3") s.Require().NoError(err) @@ -429,7 +429,7 @@ func (s *QuerierSuite) TestLabelValues() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchRegexp, "__name__", "metric.*") s.Require().NoError(err) @@ -462,7 +462,7 @@ func (s *QuerierSuite) TestLabelValuesNoMatches() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchEqual, "__name__", "metric2") s.Require().NoError(err) @@ -495,7 +495,7 @@ func (s *QuerierSuite) TestLabelValuesNoMatchesOnName() { } s.appendTimeSeries(timeSeries) - q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, nil, nil) + q := querier.NewQuerier(s.head, querier.NewNoOpShardedDeduplicator, 0, 2, cppbridge.NoDownsampling, nil, nil) defer func() { _ = q.Close() }() matcher, err := labels.NewMatcher(labels.MatchRegexp, "__name__", "metric.*") s.Require().NoError(err) diff --git a/pp/go/storage/querier/series.go b/pp/go/storage/querier/series.go index 3ec070619..ee4da3e49 100644 --- a/pp/go/storage/querier/series.go +++ b/pp/go/storage/querier/series.go @@ -30,7 +30,7 @@ func NewChunkIterator(serializedData *cppbridge.DataStorageSerializedData, chunk maxt: maxt, } - if it.chunkIterator.Timestamp < mint { + if it.chunkIterator.Timestamp() < mint { it.chunkIterator.Seek(mint) } @@ -44,7 +44,7 @@ func (it *ChunkIterator) Reset(serializedData *cppbridge.DataStorageSerializedDa it.isInitialized = false it.chunkIterator.Reset(serializedData, chunkRef) - if it.chunkIterator.Timestamp < mint { + if it.chunkIterator.Timestamp() < mint { it.chunkIterator.Seek(mint) } } @@ -53,7 +53,7 @@ func (it *ChunkIterator) Reset(serializedData *cppbridge.DataStorageSerializedDa // //nolint:gocritic // unnamedResult not need func (it *ChunkIterator) At() (int64, float64) { - return it.chunkIterator.Timestamp, it.chunkIterator.Value + return it.chunkIterator.Timestamp(), it.chunkIterator.Value() } // AtFloatHistogram returns the current timestamp/value pair if the value is a histogram with floating-point counts. @@ -68,7 +68,7 @@ func (it *ChunkIterator) AtHistogram(h *histogram.Histogram) (int64, *histogram. // AtT returns the current timestamp. func (it *ChunkIterator) AtT() int64 { - return it.chunkIterator.Timestamp + return it.chunkIterator.Timestamp() } // Err returns the current error. diff --git a/pp/go/storage/querier/series_bench_test.go b/pp/go/storage/querier/series_bench_test.go index 7b48b70c0..e5d6f9d4f 100644 --- a/pp/go/storage/querier/series_bench_test.go +++ b/pp/go/storage/querier/series_bench_test.go @@ -28,7 +28,7 @@ func iterateSeriesSet(seriesSet storage.SeriesSet) { } } -func queryOpt(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, start, end int64, matchers ...model.LabelMatcher) *querier.SeriesSet { +func queryOpt(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, start, end, downsamplingMs int64, matchers ...model.LabelMatcher) *querier.SeriesSet { selector, snapshot, err := lss.QuerySelector(0, matchers) require.NoError(t, err) if selector == 0 || snapshot == nil { @@ -44,7 +44,7 @@ func queryOpt(t testing.TB, lss *shard.LSS, ds *shard.DataStorage, start, end in StartTimestampMs: start, EndTimestampMs: end, LabelSetIDs: lssQueryResult.IDs(), - }) + }, downsamplingMs) require.Equal(t, cppbridge.DataStorageQueryStatusSuccess, dsQueryResult.Status) return querier.NewSeriesSet(start, end, lssQueryResult, snapshot, dsQueryResult.SerializedData) @@ -76,14 +76,14 @@ func BenchmarkSeriesSetOpt(b *testing.B) { } var start int64 = 0 - var end = int64(size) + end := int64(size) lss := shard.NewLSS() ds := shard.NewDataStorage() prepareData(lss, ds, size) seriesSets := make([]*querier.SeriesSet, 0, b.N) for i := 0; i < b.N; i++ { - seriesSets = append(seriesSets, queryOpt(b, lss, ds, start, end, matcher)) + seriesSets = append(seriesSets, queryOpt(b, lss, ds, start, end, cppbridge.NoDownsampling, matcher)) } b.ReportAllocs() @@ -119,7 +119,6 @@ func prepareInstantData(lss *shard.LSS, ds *shard.DataStorage, timeStamps []int6 }, }) } - } storagetest.MustAppendTimeSeriesToLSSAndDataStorage(lss, ds, timeSeries...) } diff --git a/pp/go/storage/querier/series_test.go b/pp/go/storage/querier/series_test.go index 8b018d972..6557ef35f 100644 --- a/pp/go/storage/querier/series_test.go +++ b/pp/go/storage/querier/series_test.go @@ -71,7 +71,7 @@ func (s *SeriesSetTestSuite) SetupTest() { } } -func (s *SeriesSetTestSuite) query(lss *shard.LSS, ds *shard.DataStorage, start, end int64, matchers ...model.LabelMatcher) *querier.SeriesSet { +func (s *SeriesSetTestSuite) query(lss *shard.LSS, ds *shard.DataStorage, start, end, downsamplingMs int64, matchers ...model.LabelMatcher) *querier.SeriesSet { selector, snapshot, err := lss.QuerySelector(0, matchers) require.NoError(s.T(), err) if selector == 0 || snapshot == nil { @@ -87,7 +87,7 @@ func (s *SeriesSetTestSuite) query(lss *shard.LSS, ds *shard.DataStorage, start, StartTimestampMs: start, EndTimestampMs: end, LabelSetIDs: lssQueryResult.IDs(), - }) + }, downsamplingMs) require.Equal(s.T(), cppbridge.DataStorageQueryStatusSuccess, dsQueryResult.Status) return querier.NewSeriesSet(start, end, lssQueryResult, snapshot, dsQueryResult.SerializedData) @@ -118,7 +118,7 @@ func (s *SeriesSetTestSuite) TestQueryAllValues() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -139,7 +139,7 @@ func (s *SeriesSetTestSuite) TestQueryNoValues() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -160,7 +160,7 @@ func (s *SeriesSetTestSuite) TestQuerySingleSeries() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -181,7 +181,7 @@ func (s *SeriesSetTestSuite) TestQuerySingleSample() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -202,7 +202,7 @@ func (s *SeriesSetTestSuite) TestQueryCutByUpperLimit() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -223,7 +223,7 @@ func (s *SeriesSetTestSuite) TestQueryCutByLowerLimit() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -253,7 +253,7 @@ func (s *SeriesSetTestSuite) TestQueryLargeChunks() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, timeSeries...) // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -272,7 +272,7 @@ func (s *SeriesSetTestSuite) TestQueryEmptyStorage() { expected := []storagetest.TimeSeries{} // Act - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal(s.T(), expected, storagetest.TimeSeriesFromSeriesSet(seriesSet, false)) @@ -327,8 +327,8 @@ func (s *SeriesSetTestSuite) TestQueryMergedSeriesSets() { storagetest.MustAppendTimeSeriesToLSSAndDataStorage(anotherLss, anotherDs, timeSeries2...) expected := append(timeSeries1, timeSeries2...) // Act - seriesSet1 := s.query(s.lss, s.ds, start, end, matcher) - seriesSet2 := s.query(anotherLss, anotherDs, start, end, matcher) + seriesSet1 := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) + seriesSet2 := s.query(anotherLss, anotherDs, start, end, cppbridge.NoDownsampling, matcher) // Assert require.Equal( @@ -352,7 +352,7 @@ func (s *SeriesSetTestSuite) TestSeriesSeek() { expected := s.timeSeries[:4] storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries[:4]...) - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) require.True(s.T(), seriesSet.Next()) series := seriesSet.At() require.Equal(s.T(), expected[0].Labels, series.Labels()) @@ -389,7 +389,7 @@ func (s *SeriesSetTestSuite) TestSeriesSeekOutOfRange() { var end int64 = 1000 storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries[:4]...) - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) require.True(s.T(), seriesSet.Next()) series := seriesSet.At() var iterator chunkenc.Iterator @@ -416,7 +416,7 @@ func (s *SeriesSetTestSuite) TestSeriesParallelRead() { expected := s.timeSeries storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, s.timeSeries...) - seriesSet := s.query(s.lss, s.ds, start, end, matcher) + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, matcher) seriesSlice := make([]storage.Series, 0, 2) require.True(s.T(), seriesSet.Next()) seriesSlice = append(seriesSlice, seriesSet.At()) @@ -439,7 +439,7 @@ func (s *SeriesSetTestSuite) TestSeriesResetIterator() { var start int64 = 0 var end int64 = 50 - seriesSet := s.query(s.lss, s.ds, start, end, model.LabelMatcher{ + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, model.LabelMatcher{ Name: "__name__", Value: "metric", MatcherType: model.MatcherTypeExactMatch, @@ -466,7 +466,7 @@ func (s *SeriesSetTestSuite) TestSeriesResetIteratorWithMinTimestamp() { var start int64 = 12 var end int64 = 50 - seriesSet := s.query(s.lss, s.ds, start, end, model.LabelMatcher{ + seriesSet := s.query(s.lss, s.ds, start, end, cppbridge.NoDownsampling, model.LabelMatcher{ Name: "__name__", Value: "metric", MatcherType: model.MatcherTypeExactMatch, @@ -485,3 +485,41 @@ func (s *SeriesSetTestSuite) TestSeriesResetIteratorWithMinTimestamp() { s.Equal(cppbridge.Sample{Timestamp: 12, Value: 2}, s.nextSample(iterator)) s.Equal(chunkenc.ValNone, iterator.Next()) } + +func (s *SeriesSetTestSuite) TestDownsampling() { + // Arrange + const Downsampling = 100 + storagetest.MustAppendTimeSeriesToLSSAndDataStorage(s.lss, s.ds, []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__name__", "metric", "job", "test"), + Samples: []cppbridge.Sample{ + {Timestamp: 123, Value: 1.0}, + {Timestamp: 152, Value: 1.0}, + {Timestamp: 180, Value: 1.0}, + {Timestamp: 215, Value: 1.0}, + {Timestamp: 242, Value: 1.0}, + {Timestamp: 275, Value: 1.0}, + {Timestamp: 303, Value: 1.0}, + }, + }, + }...) + + // Act + seriesSet := s.query(s.lss, s.ds, 0, 400, Downsampling, model.LabelMatcher{ + Name: "__name__", + Value: "metric", + MatcherType: model.MatcherTypeExactMatch, + }) + + // Assert + require.Equal(s.T(), []storagetest.TimeSeries{ + { + Labels: labels.FromStrings("__name__", "metric", "job", "test"), + Samples: []cppbridge.Sample{ + {Timestamp: 180, Value: 1.0}, + {Timestamp: 275, Value: 1.0}, + {Timestamp: 303, Value: 1.0}, + }, + }, + }, storagetest.TimeSeriesFromSeriesSet(seriesSet, true)) +} diff --git a/pp/go/storage/storagetest/fixtures.go b/pp/go/storage/storagetest/fixtures.go index c241dd66a..c615d3c7c 100644 --- a/pp/go/storage/storagetest/fixtures.go +++ b/pp/go/storage/storagetest/fixtures.go @@ -120,7 +120,7 @@ func GetSamplesFromSerializedData(serializedData *cppbridge.DataStorageSerialize break } - result[seriesID] = append(result[seriesID], cppbridge.Sample{Timestamp: iterator.Timestamp, Value: iterator.Value}) + result[seriesID] = append(result[seriesID], cppbridge.Sample{Timestamp: iterator.Timestamp(), Value: iterator.Value()}) iterator.Next() } } diff --git a/pp/series_data/decoder/asc_integer.h b/pp/series_data/decoder/asc_integer.h index 21036091c..addf5d4ff 100644 --- a/pp/series_data/decoder/asc_integer.h +++ b/pp/series_data/decoder/asc_integer.h @@ -5,7 +5,7 @@ namespace series_data::decoder { -class AscIntegerDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { +class AscIntegerDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { public: using Decoder = encoder::ZigZagTimestampDecoder; @@ -16,14 +16,15 @@ class AscIntegerDecodeIterator : public SeparatedTimestampValueDecodeIteratorTra const BareBones::BitSequenceReader& values_reader, bool is_last_stalenan) : SeparatedTimestampValueDecodeIteratorTrait(samples_count, timestamp_reader, 0.0, is_last_stalenan), reader_(values_reader) { - if (remaining_samples_ > 0) { + if (remaining_samples_ > 0) [[likely]] { decode_value(); + update_sample_value(); } } PROMPP_ALWAYS_INLINE AscIntegerDecodeIterator& operator++() noexcept { - if (decode_timestamp()) { - decode_value(); + if (decode()) [[likely]] { + update_sample(); } return *this; } @@ -35,13 +36,38 @@ class AscIntegerDecodeIterator : public SeparatedTimestampValueDecodeIteratorTra } private: + friend Base; + using GorillaState = BareBones::Encoding::Gorilla::GorillaState; Decoder decoder_; BareBones::BitSequenceReader reader_; GorillaState gorilla_state_{GorillaState::kFirstPoint}; + encoder::ValueType value_type_{encoder::ValueType::kValue}; + + PROMPP_ALWAYS_INLINE bool decode() noexcept { + if (decode_timestamp()) [[likely]] { + decode_value(); + return true; + } + + return false; + } + + PROMPP_ALWAYS_INLINE void decode_value() noexcept { value_type_ = decoder_.decode(reader_, gorilla_state_); } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + sample_.timestamp = decoded_timestamp(); + update_sample_value(); + } - PROMPP_ALWAYS_INLINE void decode_value() noexcept { decoder_.decode(reader_, gorilla_state_, sample_.value); } + PROMPP_ALWAYS_INLINE void update_sample_value() noexcept { + if (value_type_ == encoder::ValueType::kStaleNan) [[unlikely]] { + sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; + } else { + sample_.value = static_cast(decoder_.timestamp()); + } + } }; } // namespace series_data::decoder diff --git a/pp/series_data/decoder/asc_integer_then_values_gorilla.h b/pp/series_data/decoder/asc_integer_then_values_gorilla.h index 53530f928..739bea317 100644 --- a/pp/series_data/decoder/asc_integer_then_values_gorilla.h +++ b/pp/series_data/decoder/asc_integer_then_values_gorilla.h @@ -5,7 +5,7 @@ namespace series_data::decoder { -class AscIntegerThenValuesGorillaDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { +class AscIntegerThenValuesGorillaDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { public: AscIntegerThenValuesGorillaDecodeIterator(const encoder::BitSequenceWithItemsCount& timestamp_stream, const BareBones::BitSequenceReader& reader, @@ -16,14 +16,15 @@ class AscIntegerThenValuesGorillaDecodeIterator : public SeparatedTimestampValue const BareBones::BitSequenceReader& values_reader, bool is_last_stalenan) : SeparatedTimestampValueDecodeIteratorTrait(samples_count, timestamp_reader, 0.0, is_last_stalenan), reader_(values_reader) { - if (remaining_samples_ > 0) { + if (remaining_samples_ > 0) [[likely]] { decode_value(); + update_sample_value(); } } PROMPP_ALWAYS_INLINE AscIntegerThenValuesGorillaDecodeIterator& operator++() noexcept { - if (decode_timestamp()) { - decode_value(); + if (decode()) [[likely]] { + update_sample(); } return *this; } @@ -35,6 +36,8 @@ class AscIntegerThenValuesGorillaDecodeIterator : public SeparatedTimestampValue } private: + friend Base; + enum class DecoderType : uint8_t { kAscInteger, kValuesGorilla, @@ -54,23 +57,47 @@ class AscIntegerThenValuesGorillaDecodeIterator : public SeparatedTimestampValue ValuesGorillaState values_gorilla_; }; BareBones::BitSequenceReader reader_; - DecoderType decoder_type_{DecoderType::kAscInteger}; + encoder::ValueType value_type_{encoder::ValueType::kValue}; + + PROMPP_ALWAYS_INLINE bool decode() noexcept { + if (decode_timestamp()) [[likely]] { + decode_value(); + return true; + } + + return false; + } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + sample_.timestamp = decoded_timestamp(); + update_sample_value(); + } + + PROMPP_ALWAYS_INLINE void update_sample_value() noexcept { + if (value_type_ == encoder::ValueType::kStaleNan) [[unlikely]] { + sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; + } else if (value_type_ == encoder::ValueType::kSwitchToValuesGorillaMark) { + sample_.value = values_gorilla_.decoder.value(); + } else { + sample_.value = static_cast(asc_integer_.decoder.timestamp()); + } + } PROMPP_ALWAYS_INLINE void decode_value() noexcept { - if (decoder_type_ == DecoderType::kAscInteger) { - using enum encoder::ValueType; - if (asc_integer_.decoder.decode(reader_, asc_integer_.gorilla_state, sample_.value) == kSwitchToValuesGorillaMark) [[unlikely]] { + using enum encoder::ValueType; + + if (value_type_ != kSwitchToValuesGorillaMark) { + if (value_type_ = asc_integer_.decoder.decode(reader_, asc_integer_.gorilla_state); value_type_ == kSwitchToValuesGorillaMark) [[unlikely]] { switch_to_values_gorilla(); } } else { - sample_.value = ValuesGorillaDecodeIterator::decode_value(values_gorilla_.decoder, reader_); + ValuesGorillaDecodeIterator::decode_value(values_gorilla_.decoder, reader_); } } PROMPP_ALWAYS_INLINE void switch_to_values_gorilla() noexcept { std::construct_at(&values_gorilla_); - sample_.value = ValuesGorillaDecodeIterator::decode_value(values_gorilla_.decoder, reader_); - decoder_type_ = DecoderType::kValuesGorilla; + ValuesGorillaDecodeIterator::decode_value(values_gorilla_.decoder, reader_); } }; diff --git a/pp/series_data/decoder/constant.h b/pp/series_data/decoder/constant.h index daf2a895a..9f740d046 100644 --- a/pp/series_data/decoder/constant.h +++ b/pp/series_data/decoder/constant.h @@ -4,7 +4,7 @@ namespace series_data::decoder { -class ConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { +class ConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { public: ConstantDecodeIterator(const encoder::BitSequenceWithItemsCount& timestamp_stream, double value, bool is_last_stalenan) : SeparatedTimestampValueDecodeIteratorTrait(timestamp_stream, value, is_last_stalenan) {} @@ -13,9 +13,7 @@ class ConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait PROMPP_ALWAYS_INLINE ConstantDecodeIterator& operator++() noexcept { decode_timestamp(); - if (remaining_samples_ == 1 && last_stalenan_) [[unlikely]] { - sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; - } + update_sample(); return *this; } @@ -24,6 +22,18 @@ class ConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait ++*this; return result; } + + protected: + friend Base; + + PROMPP_ALWAYS_INLINE bool decode() noexcept { return decode_timestamp(); } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + sample_.timestamp = decoded_timestamp(); + if (remaining_samples_ == 1 && last_stalenan_) [[unlikely]] { + sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; + } + } }; } // namespace series_data::decoder diff --git a/pp/series_data/decoder/decorator/downsampling_decode_iterator.h b/pp/series_data/decoder/decorator/downsampling_decode_iterator.h new file mode 100644 index 000000000..dfbbb2755 --- /dev/null +++ b/pp/series_data/decoder/decorator/downsampling_decode_iterator.h @@ -0,0 +1,92 @@ +#pragma once + +#include "series_data/decoder/traits.h" +#include "series_data/encoder/sample.h" + +namespace series_data::decoder::decorator { + +template +class DownsamplingDecodeIterator { + public: + using Timestamp = PromPP::Primitives::Timestamp; + + enum class SampleType : uint8_t { + kFirst = 0, + kOther, + }; + + DECODE_ITERATOR_TYPE_TRAITS(); + + explicit DownsamplingDecodeIterator(Timestamp interval) : DownsamplingDecodeIterator(DecodeIterator{}, interval) {} + DownsamplingDecodeIterator(DecodeIterator&& iterator, Timestamp interval) : iterator_(std::move(iterator)), interval_(interval) { + advance_to_next_sample(); + } + + PROMPP_ALWAYS_INLINE DownsamplingDecodeIterator& operator=(DecodeIterator&& iterator) noexcept { + iterator_ = std::move(iterator); + timestamp_ = {}; + advance_to_next_sample(); + return *this; + } + + PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const noexcept { return iterator_.operator*(); } + PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const noexcept { return iterator_.operator->(); } + + PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const noexcept { return timestamp_ == kInvalidTimestamp; } + + PROMPP_ALWAYS_INLINE DownsamplingDecodeIterator& operator++() noexcept { + advance_to_next_sample(); + return *this; + } + + PROMPP_ALWAYS_INLINE DownsamplingDecodeIterator operator++(int) noexcept { + const auto result = *this; + ++*this; + return result; + } + + private: + static constexpr Timestamp kInvalidTimestamp = std::numeric_limits::min(); + static constexpr Timestamp kNoDownsampling = 0; + + Timestamp timestamp_{}; + DecodeIterator iterator_; + Timestamp interval_; + + PROMPP_ALWAYS_INLINE static Timestamp round_up_to_step(Timestamp timestamp, Timestamp step) noexcept { + const auto result = timestamp + step - 1; + return result - result % step; + } + + template + PROMPP_ALWAYS_INLINE void advance_to_next_sample() noexcept { + if (interval_ == kNoDownsampling) { + if constexpr (Type == SampleType::kOther) { + if (++iterator_ == DecodeIteratorSentinel{}) [[unlikely]] { + timestamp_ = kInvalidTimestamp; + } + } + return; + } + + advance_to_last_sample_in_interval(); + } + + PROMPP_ALWAYS_INLINE void advance_to_last_sample_in_interval() noexcept { + timestamp_ = kInvalidTimestamp; + + iterator_.seek([this](Timestamp timestamp) noexcept { + if (timestamp > timestamp_) { + if (timestamp_ != kInvalidTimestamp) [[likely]] { + return SeekResult::kStop; + } + + timestamp_ = round_up_to_step(timestamp, interval_); + } + + return SeekResult::kUpdateSample; + }); + } +}; + +} // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/interval_decode_iterator.h b/pp/series_data/decoder/decorator/interval_decode_iterator.h deleted file mode 100644 index 4238558f8..000000000 --- a/pp/series_data/decoder/decorator/interval_decode_iterator.h +++ /dev/null @@ -1,87 +0,0 @@ -#pragma once - -#include "series_data/decoder/traits.h" -#include "series_data/encoder/sample.h" - -namespace series_data::decoder::decorator { - -template -class IntervalDecodeIterator : public DecodeIteratorTypeTrait { - public: - using Timestamp = PromPP::Primitives::Timestamp; - - IntervalDecodeIterator(DecodeIterator&& iterator, DecodeIteratorSentinel&& end, Timestamp interval, Timestamp lookback) - : iterator_(std::move(iterator)), iterator_end_(std::move(end)), interval_(std::max(interval, kMinInterval)), lookback_(lookback) { - if (iterator_ != iterator_end_) { - timestamp_ = round_up_to_step(iterator_->timestamp, interval_); - advance_to_next_sample(); - } - } - - PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const noexcept { return sample_; } - PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const noexcept { return &sample_; } - - PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const noexcept { return iterator_ == iterator_end_ && sample_.timestamp == kNoSample; } - - PROMPP_ALWAYS_INLINE IntervalDecodeIterator& operator++() noexcept { - advance_to_next_sample(); - return *this; - } - - PROMPP_ALWAYS_INLINE IntervalDecodeIterator operator++(int) noexcept { - const auto result = *this; - ++*this; - return result; - } - - private: - static constexpr auto kNoSample = std::numeric_limits::min(); - static constexpr Timestamp kMinInterval = 1; - - DecodeIterator iterator_; - [[no_unique_address]] DecodeIteratorSentinel iterator_end_; - Timestamp interval_; - Timestamp lookback_; - Timestamp timestamp_{}; - encoder::Sample sample_{.timestamp = kNoSample}; - - PROMPP_ALWAYS_INLINE static Timestamp round_up_to_step(Timestamp timestamp, Timestamp step) noexcept { - const auto result = timestamp + step - 1; - return result - result % step; - } - - PROMPP_ALWAYS_INLINE void advance_to_next_sample() noexcept { - if (iterator_ == iterator_end_) [[unlikely]] { - sample_.timestamp = kNoSample; - return; - } - - Timestamp previous_timestamp; - do { - advance_to_last_sample_in_interval(); - previous_timestamp = std::exchange(timestamp_, timestamp_ + interval_); - } while (!in_lookback_interval(sample_.timestamp, previous_timestamp) && iterator_ != iterator_end_); - } - - PROMPP_ALWAYS_INLINE void advance_to_last_sample_in_interval() noexcept { - for (; iterator_ != iterator_end_ && timestamp_ >= iterator_->timestamp; ++iterator_) { - if (in_lookback_interval(iterator_->timestamp, timestamp_)) [[likely]] { - decode_sample(); - } - } - } - - [[nodiscard]] PROMPP_ALWAYS_INLINE bool in_lookback_interval(Timestamp timestamp, Timestamp deadline) const noexcept { - return deadline <= lookback_ + timestamp; - } - - PROMPP_ALWAYS_INLINE void decode_sample() noexcept { - if (!BareBones::Encoding::Gorilla::isstalenan(iterator_->value)) [[likely]] { - sample_ = *iterator_; - } else { - sample_.timestamp = kNoSample; - } - } -}; - -} // namespace series_data::decoder::decorator \ No newline at end of file diff --git a/pp/series_data/decoder/decorator/stale_nan_deduplicate_iterator.h b/pp/series_data/decoder/decorator/stale_nan_deduplicate_iterator.h index 3f52122bf..029328c12 100644 --- a/pp/series_data/decoder/decorator/stale_nan_deduplicate_iterator.h +++ b/pp/series_data/decoder/decorator/stale_nan_deduplicate_iterator.h @@ -6,8 +6,10 @@ namespace series_data::decoder::decorator { template -class StaleNanDeduplicateIterator : public DecodeIteratorTypeTrait { +class StaleNanDeduplicateIterator { public: + DECODE_ITERATOR_TYPE_TRAITS(); + StaleNanDeduplicateIterator(DecodeIterator&& iterator, DecodeIteratorSentinel&& end) : iterator_(std::move(iterator)), iterator_end_(std::move(end)) {} PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const noexcept { return *iterator_; } diff --git a/pp/series_data/decoder/gorilla.h b/pp/series_data/decoder/gorilla.h index 9b64c672c..6c3d2701d 100644 --- a/pp/series_data/decoder/gorilla.h +++ b/pp/series_data/decoder/gorilla.h @@ -6,20 +6,24 @@ namespace series_data::decoder { template -class GorillaDecodeIteratorGeneral : public DecodeIteratorTrait { - using Base = DecodeIteratorTrait; +class GorillaDecodeIteratorGeneral : public DecodeIteratorTrait, SampleCountType> { + using Base = DecodeIteratorTrait; public: explicit GorillaDecodeIteratorGeneral(const encoder::CompactBitSequence& stream, bool is_last_stalenan) : GorillaDecodeIteratorGeneral(encoder::BitSequenceWithItemsCount::count(stream), encoder::BitSequenceWithItemsCount::reader(stream), is_last_stalenan) {} GorillaDecodeIteratorGeneral(SampleCountType samples_count, const BareBones::BitSequenceReader& reader, bool is_last_stalenan) : Base(0.0, samples_count, is_last_stalenan), reader_(reader) { - decode(); + if (Base::remaining_samples_ > 0) [[likely]] { + decoder_.decode(reader_, reader_); + update_sample(); + } } PROMPP_ALWAYS_INLINE GorillaDecodeIteratorGeneral& operator++() noexcept { - --Base::remaining_samples_; - decode(); + if (decode()) [[likely]] { + update_sample(); + } return *this; } @@ -30,17 +34,27 @@ class GorillaDecodeIteratorGeneral : public DecodeIteratorTrait } private: + friend Base; + using Decoder = BareBones::Encoding::Gorilla::ValuesDecoder; BareBones::BitSequenceReader reader_; BareBones::Encoding::Gorilla::StreamDecoder, BareBones::Encoding::Gorilla::ValuesDecoder> decoder_; - PROMPP_ALWAYS_INLINE void decode() noexcept { - if (Base::remaining_samples_ > 0) { + [[nodiscard]] PROMPP_ALWAYS_INLINE PromPP::Primitives::Timestamp decoded_timestamp() const noexcept { return decoder_.last_timestamp(); } + + PROMPP_ALWAYS_INLINE bool decode() noexcept { + if (--Base::remaining_samples_ > 0) [[likely]] { decoder_.decode(reader_, reader_); - Base::sample_.value = decoder_.last_value(); - Base::sample_.timestamp = decoder_.last_timestamp(); + return true; } + + return false; + } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + Base::sample_.value = decoder_.last_value(); + Base::sample_.timestamp = decoder_.last_timestamp(); } }; diff --git a/pp/series_data/decoder/traits.h b/pp/series_data/decoder/traits.h index e75bed221..c018b2431 100644 --- a/pp/series_data/decoder/traits.h +++ b/pp/series_data/decoder/traits.h @@ -9,18 +9,31 @@ namespace series_data::decoder { class DecodeIteratorSentinel {}; -class DecodeIteratorTypeTrait { - public: - using iterator_category = std::forward_iterator_tag; - using value_type = encoder::Sample; - using difference_type = ptrdiff_t; - using pointer = encoder::Sample*; - using reference = encoder::Sample&; +#define DECODE_ITERATOR_TYPE_TRAITS() \ + using iterator_category = std::forward_iterator_tag; \ + using value_type = encoder::Sample; \ + using difference_type = ptrdiff_t; \ + using pointer = encoder::Sample*; \ + using reference = encoder::Sample& + +enum class SeekResult : uint8_t { + kUpdateSample = 0, + kNext, + kStop, }; -template -class DecodeIteratorTrait : public DecodeIteratorTypeTrait { +template +concept Seekable = requires(Iterator iterator, const Iterator const_iterator) { + { const_iterator.decoded_timestamp() } -> std::same_as; + { iterator.update_sample() }; + { iterator.decode() }; +}; + +template +class DecodeIteratorTrait { public: + DECODE_ITERATOR_TYPE_TRAITS(); + explicit DecodeIteratorTrait(SampleCountType count) : remaining_samples_{count} {} explicit DecodeIteratorTrait(double value, SampleCountType count) : sample_{.value = value}, remaining_samples_{count} {} explicit DecodeIteratorTrait(double value, SampleCountType count, bool last_stalenan) @@ -32,18 +45,40 @@ class DecodeIteratorTrait : public DecodeIteratorTypeTrait { PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel&) const noexcept { return remaining_samples_ == 0; } [[nodiscard]] PROMPP_ALWAYS_INLINE SampleCountType remaining_samples() const noexcept { return remaining_samples_; } + template + requires Seekable + PROMPP_ALWAYS_INLINE void seek(SeekHandler&& handler) { + if (remaining_samples_ == 0) [[unlikely]] { + return; + } + + do { + if (const SeekResult result = handler(derived()->decoded_timestamp()); result == SeekResult::kUpdateSample) [[likely]] { + derived()->update_sample(); + } else if (result == SeekResult::kStop) { + break; + } + } while (derived()->decode()); + } + protected: encoder::Sample sample_; SampleCountType remaining_samples_{}; bool last_stalenan_{false}; + + private: + [[nodiscard]] PROMPP_ALWAYS_INLINE Derived* derived() noexcept { return static_cast(this); } }; -class SeparatedTimestampValueDecodeIteratorTrait : public DecodeIteratorTrait { +template +class SeparatedTimestampValueDecodeIteratorTrait : public DecodeIteratorTrait { public: + using Base = DecodeIteratorTrait; + SeparatedTimestampValueDecodeIteratorTrait(uint8_t samples_count, const BareBones::BitSequenceReader& timestamp_reader, double value, bool last_stalenan) - : DecodeIteratorTrait(value, samples_count, last_stalenan), timestamp_decoder_(timestamp_reader) { - if (remaining_samples_ > 0) { - sample_.timestamp = timestamp_decoder_.decode(); + : Base(value, samples_count, last_stalenan), timestamp_decoder_(timestamp_reader) { + if (Base::remaining_samples_ > 0) [[likely]] { + Base::sample_.timestamp = timestamp_decoder_.decode(); } } explicit SeparatedTimestampValueDecodeIteratorTrait(const encoder::BitSequenceWithItemsCount& timestamp_stream) @@ -53,17 +88,21 @@ class SeparatedTimestampValueDecodeIteratorTrait : public DecodeIteratorTrait; + + encoder::timestamp::TimestampDecoder timestamp_decoder_; + PROMPP_ALWAYS_INLINE bool decode_timestamp() noexcept { - if (--remaining_samples_ > 0) { - sample_.timestamp = timestamp_decoder_.decode(); + if (--Base::remaining_samples_ > 0) [[likely]] { + std::ignore = timestamp_decoder_.decode(); return true; } return false; } - protected: - encoder::timestamp::TimestampDecoder timestamp_decoder_; + [[nodiscard]] PROMPP_ALWAYS_INLINE PromPP::Primitives::Timestamp decoded_timestamp() const noexcept { return timestamp_decoder_.timestamp(); } }; } // namespace series_data::decoder \ No newline at end of file diff --git a/pp/series_data/decoder/two_double_constant.h b/pp/series_data/decoder/two_double_constant.h index dbf204080..5eb9d2618 100644 --- a/pp/series_data/decoder/two_double_constant.h +++ b/pp/series_data/decoder/two_double_constant.h @@ -5,7 +5,7 @@ namespace series_data::decoder { -class TwoDoubleConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { +class TwoDoubleConstantDecodeIterator : public SeparatedTimestampValueDecodeIteratorTrait { public: TwoDoubleConstantDecodeIterator(const encoder::BitSequenceWithItemsCount& timestamp_stream, const encoder::value::TwoDoubleConstantEncoder& encoder, @@ -25,14 +25,8 @@ class TwoDoubleConstantDecodeIterator : public SeparatedTimestampValueDecodeIter value1_count_(encoder.value1_count()) {} PROMPP_ALWAYS_INLINE TwoDoubleConstantDecodeIterator& operator++() noexcept { - if (decode_timestamp()) { - ++count_; - - if (remaining_samples_ == 1 && last_stalenan_) [[unlikely]] { - sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; - } else [[likely]] { - sample_.value = count_ <= value1_count_ ? value1_ : value2_; - } + if (decode()) [[likely]] { + update_sample(); } return *this; } @@ -44,10 +38,27 @@ class TwoDoubleConstantDecodeIterator : public SeparatedTimestampValueDecodeIter } private: + friend Base; + double value1_; double value2_; uint8_t value1_count_; uint8_t count_{1}; + + PROMPP_ALWAYS_INLINE bool decode() noexcept { + ++count_; + return decode_timestamp(); + } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + sample_.timestamp = decoded_timestamp(); + + if (remaining_samples_ == 1 && last_stalenan_) [[unlikely]] { + sample_.value = BareBones::Encoding::Gorilla::STALE_NAN; + } else [[likely]] { + sample_.value = count_ <= value1_count_ ? value1_ : value2_; + } + } }; } // namespace series_data::decoder diff --git a/pp/series_data/decoder/universal_decode_iterator.h b/pp/series_data/decoder/universal_decode_iterator.h index 4d24ad9a6..873d30b12 100644 --- a/pp/series_data/decoder/universal_decode_iterator.h +++ b/pp/series_data/decoder/universal_decode_iterator.h @@ -11,33 +11,42 @@ namespace series_data::decoder { -class UniversalDecodeIterator : public DecodeIteratorTypeTrait { +class UniversalDecodeIterator { public: + DECODE_ITERATOR_TYPE_TRAITS(); + + UniversalDecodeIterator() : iterator_(std::in_place_type, 0, BareBones::BitSequenceReader(nullptr, 0), 0.0, false) {} + template explicit UniversalDecodeIterator(InPlaceType in_place_type, Args&&... args) : iterator_(in_place_type, std::forward(args)...) {} - PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const noexcept { + PROMPP_ALWAYS_INLINE const encoder::Sample& operator*() const { return std::visit([](auto& iterator) PROMPP_LAMBDA_INLINE -> auto const& { return *iterator; }, iterator_); } - PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const noexcept { + PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const { return std::visit([](auto& iterator) PROMPP_LAMBDA_INLINE -> auto const* { return iterator.operator->(); }, iterator_); } - PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel& sentinel) const noexcept { + PROMPP_ALWAYS_INLINE bool operator==(const DecodeIteratorSentinel& sentinel) const { return std::visit([&sentinel](const auto& iterator) PROMPP_LAMBDA_INLINE { return iterator == sentinel; }, iterator_); } - PROMPP_ALWAYS_INLINE UniversalDecodeIterator& operator++() noexcept { + PROMPP_ALWAYS_INLINE UniversalDecodeIterator& operator++() { std::visit([](auto& iterator) PROMPP_LAMBDA_INLINE { ++iterator; }, iterator_); return *this; } - PROMPP_ALWAYS_INLINE UniversalDecodeIterator operator++(int) noexcept { + PROMPP_ALWAYS_INLINE UniversalDecodeIterator operator++(int) { const auto result = *this; ++*this; return result; } + template + PROMPP_ALWAYS_INLINE void seek(SeekHandler&& handler) { + std::visit([&](auto& iterator) PROMPP_LAMBDA_INLINE { iterator.seek(std::forward(handler)); }, iterator_); + } + private: using IteratorVariant = std::variant { public: using Decoder = BareBones::Encoding::Gorilla::ValuesDecoder; + enum class SampleType : uint8_t { + kFirst = 0, + kOther, + }; + ValuesGorillaDecodeIterator(const encoder::BitSequenceWithItemsCount& timestamp_stream, const BareBones::BitSequenceReader& reader, bool is_last_stalenan) : ValuesGorillaDecodeIterator(timestamp_stream.count(), timestamp_stream.reader(), reader, is_last_stalenan) {} ValuesGorillaDecodeIterator(uint8_t samples_count, @@ -16,14 +21,15 @@ class ValuesGorillaDecodeIterator : public SeparatedTimestampValueDecodeIterator const BareBones::BitSequenceReader& values_reader, bool is_last_stalenan) : SeparatedTimestampValueDecodeIteratorTrait(samples_count, timestamp_reader, 0.0, is_last_stalenan), reader_(values_reader) { - if (remaining_samples_ > 0) { - decode_value(); + if (remaining_samples_ > 0) [[likely]] { + decode_value(); + sample_.value = decoder_.value(); } } PROMPP_ALWAYS_INLINE ValuesGorillaDecodeIterator& operator++() noexcept { - if (decode_timestamp()) { - decode_value(); + if (decode()) [[likely]] { + update_sample(); } return *this; } @@ -34,24 +40,38 @@ class ValuesGorillaDecodeIterator : public SeparatedTimestampValueDecodeIterator return result; } - template - PROMPP_ALWAYS_INLINE static double decode_value(Decoder& decoder, BareBones::BitSequenceReader& reader) noexcept { - if constexpr (first) { + template + PROMPP_ALWAYS_INLINE static void decode_value(Decoder& decoder, BareBones::BitSequenceReader& reader) noexcept { + if constexpr (Type == SampleType::kFirst) { decoder.decode_first(reader); } else { decoder.decode(reader); } - - return decoder.value(); } private: + friend Base; + BareBones::BitSequenceReader reader_; Decoder decoder_; - template + PROMPP_ALWAYS_INLINE bool decode() noexcept { + if (decode_timestamp()) [[likely]] { + decode_value(); + return true; + } + + return false; + } + + PROMPP_ALWAYS_INLINE void update_sample() noexcept { + sample_.timestamp = decoded_timestamp(); + sample_.value = decoder_.value(); + } + + template PROMPP_ALWAYS_INLINE void decode_value() noexcept { - sample_.value = decode_value(decoder_, reader_); + decode_value(decoder_, reader_); } }; diff --git a/pp/series_data/encoder/timestamp/encoder.h b/pp/series_data/encoder/timestamp/encoder.h index 827337fc9..63f9bc4ba 100644 --- a/pp/series_data/encoder/timestamp/encoder.h +++ b/pp/series_data/encoder/timestamp/encoder.h @@ -54,6 +54,8 @@ class TimestampDecoder { return values; } + [[nodiscard]] PROMPP_ALWAYS_INLINE int64_t timestamp() const noexcept { return decoder_.timestamp(); } + private: using GorillaState = BareBones::Encoding::Gorilla::GorillaState; diff --git a/pp/series_data/encoder/zig_zag_timestamp_gorilla.h b/pp/series_data/encoder/zig_zag_timestamp_gorilla.h index 385cbde7b..cf5fc4127 100644 --- a/pp/series_data/encoder/zig_zag_timestamp_gorilla.h +++ b/pp/series_data/encoder/zig_zag_timestamp_gorilla.h @@ -101,7 +101,7 @@ class PROMPP_ATTRIBUTE_PACKED ZigZagTimestampDecoder : public BareBones::Encodin return ValueType::kValue; } - ValueType decode(BareBones::BitSequenceReader& reader, BareBones::Encoding::Gorilla::GorillaState& state, double& value) noexcept { + ValueType decode(BareBones::BitSequenceReader& reader, BareBones::Encoding::Gorilla::GorillaState& state) noexcept { using enum BareBones::Encoding::Gorilla::GorillaState; if (state == kFirstPoint) [[unlikely]] { @@ -111,15 +111,9 @@ class PROMPP_ATTRIBUTE_PACKED ZigZagTimestampDecoder : public BareBones::Encodin decode_delta(reader); state = kOtherPoint; } else { - if (const auto type = decode_delta_of_delta_with_stale_nan(reader); type == ValueType::kStaleNan) [[unlikely]] { - value = BareBones::Encoding::Gorilla::STALE_NAN; - return ValueType::kStaleNan; - } else if (type == ValueType::kSwitchToValuesGorillaMark) [[unlikely]] { - return ValueType::kSwitchToValuesGorillaMark; - } + return decode_delta_of_delta_with_stale_nan(reader); } - value = static_cast(timestamp()); return ValueType::kValue; } }; diff --git a/pp/series_data/serialization/serialized_data.h b/pp/series_data/serialization/serialized_data.h index 7d815f5b9..89a2b9cb6 100644 --- a/pp/series_data/serialization/serialized_data.h +++ b/pp/series_data/serialization/serialized_data.h @@ -202,11 +202,17 @@ class DataSerializer { const DataStorage& storage_; }; +template +concept AssignableFromUniversaleDecodeIterator = requires(DecodeIterator iterator) { + { iterator = decoder::UniversalDecodeIterator{} }; +}; + class SerializedDataView { public: using series_id_inner_chunk_id_t = std::pair; static constexpr uint32_t kNoMoreSeries = std::numeric_limits::max(); + template class SeriesIterator { public: using iterator_category = std::forward_iterator_tag; @@ -215,8 +221,8 @@ class SerializedDataView { using pointer = value_type*; using reference = value_type&; - SeriesIterator(std::span buffer, chunk::SerializedChunkSpan chunks, uint32_t chunk_id) - : decode_iter_(std::in_place_type, 0, BareBones::BitSequenceReader(nullptr, 0), 0, false), + SeriesIterator(DecodeIterator&& decode_iterator, std::span buffer, chunk::SerializedChunkSpan chunks, uint32_t chunk_id) + : decode_iter_(std::move(decode_iterator)), chunk_iter_(chunks.begin() + chunk_id), series_id_(chunk_iter_->label_set_id), buffer_(buffer), @@ -230,8 +236,7 @@ class SerializedDataView { [[nodiscard]] PROMPP_ALWAYS_INLINE const encoder::Sample* operator->() const noexcept { return decode_iter_.operator->(); } PROMPP_ALWAYS_INLINE SeriesIterator& operator++() noexcept { - ++decode_iter_; - if (decode_iter_ == decoder::DecodeIteratorSentinel{}) [[unlikely]] { + if (++decode_iter_ == decoder::DecodeIteratorSentinel{}) [[unlikely]] { if (std::next(chunk_iter_) != chunks_.end() && series_id_ == std::next(chunk_iter_)->label_set_id) { ++chunk_iter_; Decoder::create_decode_iterator(buffer_, *chunk_iter_, [&](Iterator&& begin, auto&&) { @@ -265,7 +270,7 @@ class SerializedDataView { } private: - decoder::UniversalDecodeIterator decode_iter_; + DecodeIterator decode_iter_; chunk::SerializedChunkSpan::const_iterator chunk_iter_; uint32_t series_id_; @@ -306,9 +311,14 @@ class SerializedDataView { return {chunks[series_first_chunk_id_].label_set_id, series_first_chunk_id_}; } - [[nodiscard]] SeriesIterator create_current_series_iterator() const noexcept { return {get_buffer_view(), get_chunks_view(), series_first_chunk_id_}; } - [[nodiscard]] SeriesIterator create_series_iterator(uint32_t series_first_chunk_id) const noexcept { - return {get_buffer_view(), get_chunks_view(), series_first_chunk_id}; + template + [[nodiscard]] SeriesIterator create_current_series_iterator(DecodeIterator&& decode_iterator = DecodeIterator{}) const noexcept { + return {std::forward(decode_iterator), get_buffer_view(), get_chunks_view(), series_first_chunk_id_}; + } + template + [[nodiscard]] SeriesIterator create_series_iterator(uint32_t series_first_chunk_id, + DecodeIterator&& decode_iterator = DecodeIterator{}) const noexcept { + return {std::forward(decode_iterator), get_buffer_view(), get_chunks_view(), series_first_chunk_id}; } private: diff --git a/pp/series_data/tests/decoder/decorator/interval_decode_iterator_tests.cpp b/pp/series_data/tests/decoder/decorator/downsampling_decode_iterator_tests.cpp similarity index 65% rename from pp/series_data/tests/decoder/decorator/interval_decode_iterator_tests.cpp rename to pp/series_data/tests/decoder/decorator/downsampling_decode_iterator_tests.cpp index a2c94c3c5..d57ddf4c2 100644 --- a/pp/series_data/tests/decoder/decorator/interval_decode_iterator_tests.cpp +++ b/pp/series_data/tests/decoder/decorator/downsampling_decode_iterator_tests.cpp @@ -1,43 +1,79 @@ #include -#include "series_data/decoder/decorator/interval_decode_iterator.h" +#include "series_data/decoder.h" +#include "series_data/decoder/decorator/downsampling_decode_iterator.h" +#include "series_data/decoder/universal_decode_iterator.h" +#include "series_data/encoder.h" namespace { using BareBones::Encoding::Gorilla::STALE_NAN; -using series_data::decoder::decorator::IntervalDecodeIterator; +using series_data::DataStorage; +using series_data::Decoder; +using series_data::Encoder; +using series_data::chunk::DataChunk; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::decoder::UniversalDecodeIterator; +using series_data::decoder::decorator::DownsamplingDecodeIterator; using series_data::encoder::Sample; struct IntervalDecodeIteratorCase { std::vector samples; PromPP::Primitives::Timestamp interval; - PromPP::Primitives::Timestamp lookback{1000}; std::vector expected{}; }; -class IntervalDecodeIteratorFixture : public ::testing::TestWithParam {}; +class DownsamplingDecodeIteratorFixture : public ::testing::TestWithParam { + protected: + DataStorage storage_; -TEST_P(IntervalDecodeIteratorFixture, Test) { + void SetUp() override { + Encoder encoder(storage_); + for (const auto& sample : GetParam().samples) { + encoder.encode(0, sample.timestamp, sample.value); + } + } +}; + +TEST_P(DownsamplingDecodeIteratorFixture, Test) { + // Arrange + std::vector actual_samples; + + // Act + Decoder::create_decode_iterator(storage_, storage_.open_chunks[0], [&actual_samples](Iterator&& begin, auto&&) { + std::ranges::copy(DownsamplingDecodeIterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, GetParam().interval), + DecodeIteratorSentinel{}, std::back_inserter(actual_samples)); + }); + + // Assert + EXPECT_EQ(GetParam().expected, actual_samples); +} + +TEST_P(DownsamplingDecodeIteratorFixture, TestReset) { // Arrange std::vector actual_samples; // Act - std::ranges::copy(IntervalDecodeIterator(GetParam().samples.begin(), GetParam().samples.end(), GetParam().interval, GetParam().lookback), - GetParam().samples.end(), std::back_inserter(actual_samples)); + Decoder::create_decode_iterator(storage_, storage_.open_chunks[0], [&actual_samples](Iterator&& begin, auto&&) { + DownsamplingDecodeIterator iterator(UniversalDecodeIterator{std::in_place_type, std::forward(begin)}, GetParam().interval); + std::advance(iterator, GetParam().samples.size()); + + iterator = UniversalDecodeIterator{std::in_place_type, std::forward(begin)}; + std::ranges::copy(iterator, DecodeIteratorSentinel{}, std::back_inserter(actual_samples)); + }); // Assert EXPECT_EQ(GetParam().expected, actual_samples); } -INSTANTIATE_TEST_SUITE_P(Empty, IntervalDecodeIteratorFixture, testing::Values(IntervalDecodeIteratorCase{})); INSTANTIATE_TEST_SUITE_P( OneSample, - IntervalDecodeIteratorFixture, + DownsamplingDecodeIteratorFixture, testing::Values( IntervalDecodeIteratorCase{.samples{Sample{.timestamp = 100, .value = 1.0}}, .interval = 100, .expected{Sample{.timestamp = 100, .value = 1.0}}}, IntervalDecodeIteratorCase{.samples{Sample{.timestamp = 300, .value = 1.0}}, .interval = 400, .expected{Sample{.timestamp = 300, .value = 1.0}}})); INSTANTIATE_TEST_SUITE_P(ManySamples, - IntervalDecodeIteratorFixture, + DownsamplingDecodeIteratorFixture, testing::Values(IntervalDecodeIteratorCase{.samples{ Sample{.timestamp = 100, .value = 1.0}, Sample{.timestamp = 200, .value = 1.0}, @@ -85,81 +121,24 @@ INSTANTIATE_TEST_SUITE_P(ManySamples, .expected{ Sample{.timestamp = 180, .value = 1.0}, Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, Sample{.timestamp = 503, .value = 1.0}, Sample{.timestamp = 604, .value = 1.0}, }})); -INSTANTIATE_TEST_SUITE_P(UseMinInterval, - IntervalDecodeIteratorFixture, - testing::Values(IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 0, .value = 1.0}, - Sample{.timestamp = 1, .value = 1.0}, - Sample{.timestamp = 2, .value = 1.0}, - }, - .interval = 0, - .expected{ - Sample{.timestamp = 0, .value = 1.0}, - Sample{.timestamp = 1, .value = 1.0}, - Sample{.timestamp = 2, .value = 1.0}, - }})); -INSTANTIATE_TEST_SUITE_P(LookbackDelta, - IntervalDecodeIteratorFixture, - testing::Values(IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 180, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 503, .value = 1.0}, - Sample{.timestamp = 603, .value = 1.0}, - }, +INSTANTIATE_TEST_SUITE_P(StaleNan, + DownsamplingDecodeIteratorFixture, + testing::Values(IntervalDecodeIteratorCase{.samples{Sample{.timestamp = 100, .value = STALE_NAN}}, .interval = 100, - .lookback = 125, .expected{ - Sample{.timestamp = 180, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 503, .value = 1.0}, - Sample{.timestamp = 603, .value = 1.0}, + Sample{.timestamp = 100, .value = STALE_NAN}, }}, IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 180, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 503, .value = 1.0}, - Sample{.timestamp = 603, .value = 1.0}, + Sample{.timestamp = 99, .value = 1.0}, + Sample{.timestamp = 100, .value = STALE_NAN}, }, .interval = 100, - .lookback = 124, .expected{ - Sample{.timestamp = 180, .value = 1.0}, - Sample{.timestamp = 275, .value = 1.0}, - Sample{.timestamp = 503, .value = 1.0}, - Sample{.timestamp = 603, .value = 1.0}, - }}, - IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 1, .value = 1.0}, - }, - .interval = 101, - .lookback = 100, - .expected{ - Sample{.timestamp = 1, .value = 1.0}, - }})); -INSTANTIATE_TEST_SUITE_P(NoSamples, - IntervalDecodeIteratorFixture, - testing::Values(IntervalDecodeIteratorCase{.samples{Sample{.timestamp = 1, .value = 1.0}}, .interval = 100, .lookback = 98}, - IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 1, .value = 1.0}, - Sample{.timestamp = 2, .value = 1.0}, - Sample{.timestamp = 3, .value = 1.0}, - }, - .interval = 100, - .lookback = 96})); -INSTANTIATE_TEST_SUITE_P(StaleNan, - IntervalDecodeIteratorFixture, - testing::Values(IntervalDecodeIteratorCase{.samples{Sample{.timestamp = 100, .value = STALE_NAN}}, .interval = 100}, - IntervalDecodeIteratorCase{.samples{ - Sample{.timestamp = 99, .value = 1.0}, Sample{.timestamp = 100, .value = STALE_NAN}, - }, - .interval = 100}, + }}, IntervalDecodeIteratorCase{.samples{ Sample{.timestamp = 98, .value = 1.0}, Sample{.timestamp = 99, .value = STALE_NAN}, @@ -171,13 +150,31 @@ INSTANTIATE_TEST_SUITE_P(StaleNan, }}, IntervalDecodeIteratorCase{.samples{ Sample{.timestamp = 100, .value = STALE_NAN}, + Sample{.timestamp = 101, .value = 1.0}, Sample{.timestamp = 200, .value = STALE_NAN}, + Sample{.timestamp = 201, .value = 1.0}, Sample{.timestamp = 300, .value = STALE_NAN}, Sample{.timestamp = 400, .value = 1.0}, }, .interval = 100, .expected{ + Sample{.timestamp = 100, .value = STALE_NAN}, + Sample{.timestamp = 200, .value = STALE_NAN}, + Sample{.timestamp = 300, .value = STALE_NAN}, Sample{.timestamp = 400, .value = 1.0}, }})); +INSTANTIATE_TEST_SUITE_P(NoDownsampling, + DownsamplingDecodeIteratorFixture, + testing::Values(IntervalDecodeIteratorCase{.samples{ + Sample{.timestamp = 98, .value = 1.0}, + Sample{.timestamp = 99, .value = STALE_NAN}, + Sample{.timestamp = 100, .value = 1.0}, + }, + .interval = 0, + .expected{ + Sample{.timestamp = 98, .value = 1.0}, + Sample{.timestamp = 99, .value = STALE_NAN}, + Sample{.timestamp = 100, .value = 1.0}, + }})); } // namespace \ No newline at end of file diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 837d0e924..e2ea0decf 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -194,8 +194,10 @@ type API struct { QueryEngine promql.QueryEngine ExemplarQueryable storage.ExemplarQueryable - adapter handler.Adapter // PP_CHANGES.md: rebuild on cpp - opHandler *handler.PPHandler // PP_CHANGES.md: rebuild on cpp + adapter handler.Adapter // PP_CHANGES.md: rebuild on cpp + longtermQueryable storage.SampleAndChunkQueryable // PP_CHANGES.md: rebuild on cpp + longtermQueryEngine promql.QueryEngine // PP_CHANGES.md: rebuild on cpp + opHandler *handler.PPHandler // PP_CHANGES.md: rebuild on cpp scrapePoolsRetriever func(context.Context) ScrapePoolsRetriever targetRetriever func(context.Context) TargetRetriever @@ -233,6 +235,8 @@ func NewAPI( eq storage.ExemplarQueryable, adapter handler.Adapter, // PP_CHANGES.md: rebuild on cpp + longtermQueryable storage.SampleAndChunkQueryable, // PP_CHANGES.md: rebuild on cpp + longtermQE promql.QueryEngine, // PP_CHANGES.md: rebuild on cpp spsr func(context.Context) ScrapePoolsRetriever, tr func(context.Context) TargetRetriever, @@ -265,7 +269,9 @@ func NewAPI( Queryable: q, ExemplarQueryable: eq, - adapter: adapter, // PP_CHANGES.md: rebuild on cpp + adapter: adapter, // PP_CHANGES.md: rebuild on cpp + longtermQueryable: longtermQueryable, // PP_CHANGES.md: rebuild on cpp + longtermQueryEngine: longtermQE, // PP_CHANGES.md: rebuild on cpp scrapePoolsRetriever: spsr, targetRetriever: tr, @@ -453,9 +459,24 @@ func (api *API) query(r *http.Request) (result apiFuncResult) { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - qry, err := api.QueryEngine.NewInstantQuery(ctx, api.Queryable, opts, r.FormValue("query"), ts) - if err != nil { - return invalidParamError(err, "query") + + var qry promql.Query + if isLongterm(r) { + qry, err = api.longtermQueryEngine.NewInstantQuery( + ctx, + api.longtermQueryable, + opts, + r.FormValue("query"), + ts, + ) + if err != nil { + return invalidParamError(err, "query") + } + } else { + qry, err = api.QueryEngine.NewInstantQuery(ctx, api.Queryable, opts, r.FormValue("query"), ts) + if err != nil { + return invalidParamError(err, "query") + } } // From now on, we must only return with a finalizer in the result (to @@ -556,10 +577,28 @@ func (api *API) queryRange(r *http.Request) (result apiFuncResult) { if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } - qry, err := api.QueryEngine.NewRangeQuery(ctx, api.Queryable, opts, r.FormValue("query"), start, end, step) - if err != nil { - return invalidParamError(err, "query") + + var qry promql.Query + if isLongterm(r) { + qry, err = api.longtermQueryEngine.NewRangeQuery( + ctx, + api.longtermQueryable, + opts, + r.FormValue("query"), + start, + end, + step, + ) + if err != nil { + return invalidParamError(err, "query") + } + } else { + qry, err = api.QueryEngine.NewRangeQuery(ctx, api.Queryable, opts, r.FormValue("query"), start, end, step) + if err != nil { + return invalidParamError(err, "query") + } } + // From now on, we must only return with a finalizer in the result (to // be called by the caller) or call qry.Close ourselves (which is // required in the case of a panic). @@ -676,9 +715,17 @@ func (api *API) labelNames(r *http.Request) apiFuncResult { Limit: toHintLimit(limit), } - q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) - if err != nil { - return apiFuncResult{nil, returnAPIError(err), nil, nil} + var q storage.Querier + if isLongterm(r) { + q, err = api.longtermQueryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } + } else { + q, err = api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } } defer q.Close() @@ -760,9 +807,17 @@ func (api *API) labelValues(r *http.Request) (result apiFuncResult) { Limit: toHintLimit(limit), } - q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) - if err != nil { - return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + var q storage.Querier + if isLongterm(r) { + q, err = api.longtermQueryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + } + } else { + q, err = api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, &apiError{errorExec, err}, nil, nil} + } } // From now on, we must only return with a finalizer in the result (to // be called by the caller) or call q.Close ourselves (which is required @@ -865,9 +920,17 @@ func (api *API) series(r *http.Request) (result apiFuncResult) { return invalidParamError(err, "match[]") } - q, err := api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) - if err != nil { - return apiFuncResult{nil, returnAPIError(err), nil, nil} + var q storage.Querier + if isLongterm(r) { + q, err = api.longtermQueryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } + } else { + q, err = api.Queryable.Querier(timestamp.FromTime(start), timestamp.FromTime(end)) + if err != nil { + return apiFuncResult{nil, returnAPIError(err), nil, nil} + } } // From now on, we must only return with a finalizer in the result (to // be called by the caller) or call q.Close ourselves (which is required @@ -1956,3 +2019,8 @@ func toHintLimit(limit int) int { } return limit } + +// isLongterm check Request on data source. +func isLongterm(r *http.Request) bool { + return r.Header.Get("longterm") == "true" +} diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 978cb3109..37de0ef61 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -117,7 +117,9 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route q, nil, nil, - nil, + nil, // adapter + nil, // longtermQueryable + nil, // longtermQE func(context.Context) ScrapePoolsRetriever { return &DummyScrapePoolsRetriever{} }, func(context.Context) TargetRetriever { return &DummyTargetRetriever{} }, func(context.Context) AlertmanagerRetriever { return &DummyAlertmanagerRetriever{} }, diff --git a/web/web.go b/web/web.go index 90c80b84c..74f1e32b4 100644 --- a/web/web.go +++ b/web/web.go @@ -245,6 +245,10 @@ type Options struct { Version *PrometheusVersion Flags map[string]string + // longterm + LongtermStorage storage.Storage + LongtermQueryEngine *promql.Engine + ListenAddresses []string CORSOrigin *regexp.Regexp ReadTimeout time.Duration @@ -334,6 +338,8 @@ func New(logger log.Logger, o *Options, adapter handler.Adapter) *Handler { // P app, h.exemplarStorage, adapter, + h.options.LongtermStorage, + h.options.LongtermQueryEngine, factorySPr, factoryTr, factoryAr,