diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 5e0a6626d0..aa5d541bc5 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -771,6 +771,7 @@ func main() { // This is passed to ruleManager.Update(). externalURL := cfg.web.ExternalURL.String() + gcmAgentWriteSkipper := &writeSkipperForNoRWConfig{logger: logger, noRWEndpointConfigured: atomic.NewBool(true)} reloaders := []reloader{ { name: "db_storage", @@ -781,6 +782,15 @@ func main() { }, { name: "web_handler", reloader: webHandler.ApplyConfig, + }, { + // NOTE(bwplotka): GMP forked logic. + name: "gmp_noopfornorwconfig_storage", + reloader: func(cfg *config.Config) error { + if agentMode { + return gcmAgentWriteSkipper.ApplyConfig(cfg) + } + return nil + }, }, { name: "query_engine", reloader: func(cfg *config.Config) error { @@ -1115,7 +1125,7 @@ func main() { func() error { select { case <-dbOpen: - // In case a shutdown is initiated before the dbOpen is released + // In case a shutdown is initiated before the dbOpen is released case <-cancel: reloadReady.Close() return nil @@ -1196,6 +1206,7 @@ func main() { if agentMode { // WAL storage. opts := cfg.agent.ToAgentOptions() + opts.SkipWrite = gcmAgentWriteSkipper.noRWEndpointConfigured cancel := make(chan struct{}) g.Add( func() error { @@ -1770,3 +1781,22 @@ func deleteStorageData(agentMode bool, dataPath string) error { } return nil } + +type writeSkipperForNoRWConfig struct { + logger log.Logger + noRWEndpointConfigured *atomic.Bool +} + +func (s *writeSkipperForNoRWConfig) ApplyConfig(conf *config.Config) error { + if len(conf.RemoteWriteConfigs) > 0 { + if s.noRWEndpointConfigured.Swap(false) { + level.Info(s.logger).Log("msg", "gmp forked logic: enabling agent storage appending given a new remote_write config entry") + } + } else { + if !s.noRWEndpointConfigured.Swap(true) { + level.Info(s.logger).Log("msg", "gmp forked logic: disabling agent storage appending given no remote_write was configured; no need to utilize agent WAL.") + // TODO(bwplotka): Remove left-over from WAL? + } + } + return nil +} diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index c0d5039af2..4bddb6b95b 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -82,6 +82,9 @@ type Options struct { // NoLockfile disables creation and consideration of a lock file. NoLockfile bool + + // NOTE: GCM forked logic, controls if we should skip WAL/WBL for GCM only mode. + SkipWrite *atomic.Bool } // DefaultOptions used for the WAL storage. They are reasonable for setups using @@ -95,6 +98,7 @@ func DefaultOptions() *Options { MinWALTime: DefaultMinWALTime, MaxWALTime: DefaultMaxWALTime, NoLockfile: false, + SkipWrite: atomic.NewBool(false), } } @@ -298,6 +302,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100), pendingExamplars: make([]record.RefExemplar, 0, 10), exportExemplars: make(map[storage.SeriesRef]record.RefExemplar, 10), + skipWrite: opts.SkipWrite.Load(), } } @@ -349,6 +354,9 @@ func validateOptions(opts *Options) *Options { if t := int64(opts.TruncateFrequency / time.Millisecond); opts.MaxWALTime < t { opts.MaxWALTime = t } + if opts.SkipWrite == nil { + opts.SkipWrite = atomic.NewBool(false) + } return opts } @@ -621,6 +629,8 @@ Loop: if ts < 0 { ts = 0 } + // TODO(bwplotka): Debug, remove later. + level.Warn(db.logger).Log("msg", "gmp: truncating", "lowest", db.rs.LowestSentTimestamp(), "minTime", db.opts.MinWALTime, "result", ts) // Network issues can prevent the result of getRemoteWriteTimestamp from // changing. We don't want data in the WAL to grow forever, so we set a cap @@ -629,8 +639,8 @@ Loop: if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS { ts = maxTS } - - level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts) + // TODO(bwplotka): Move back to debug. + level.Warn(db.logger).Log("msg", "truncating the WAL", "ts", ts) if err := db.truncate(ts); err != nil { level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err) } @@ -800,10 +810,13 @@ type appender struct { // Series lock is not held on elements. floatHistogramSeries []*memSeries + // NOTE: GCM forked logic metadata gcm_export.MetadataFunc - - // exemplars to be exported to GCM + // exemplars to be exported to GCM. exportExemplars map[storage.SeriesRef]record.RefExemplar + // skipWrite ignore writes to WAL/WBL if true. This is to skip appending to + // WAL storage when no PRW is configured -- GCM export does not need that storage. + skipWrite bool } func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -831,7 +844,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo Labels: l, }) - a.metrics.numActiveSeries.Inc() + if !a.skipWrite { + a.metrics.numActiveSeries.Inc() + } } } @@ -961,7 +976,9 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int Labels: l, }) - a.metrics.numActiveSeries.Inc() + if !a.skipWrite { + a.metrics.numActiveSeries.Inc() + } } } @@ -1006,6 +1023,13 @@ func (a *appender) Commit() error { a.mtx.RLock() defer a.mtx.RUnlock() + // NOTE: GCM forked logic. + gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars) + if a.skipWrite { + return a.Rollback() + } + // ---- + var encoder record.Encoder buf := a.bufPool.Get().([]byte) @@ -1069,8 +1093,6 @@ func (a *appender) Commit() error { } } - gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars) - //nolint:staticcheck a.bufPool.Put(buf) return a.Rollback()