From 613e34e096343d0fc7e8eb4df06a342450c976a5 Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Fri, 14 Nov 2025 09:59:31 +0000 Subject: [PATCH 1/3] concurency executer --- rules/concurrency_executer.go | 89 +++++++++++++++++ rules/group.go | 70 +++++++------- rules/manager.go | 170 ++++++++++++++++++--------------- rules/manager_test.go | 26 ++--- storage/pp_fanout_queryable.go | 39 ++++++-- storage/pp_merge.go | 33 +++++++ storage/secondary.go | 8 +- 7 files changed, 299 insertions(+), 136 deletions(-) create mode 100644 rules/concurrency_executer.go create mode 100644 storage/pp_merge.go diff --git a/rules/concurrency_executer.go b/rules/concurrency_executer.go new file mode 100644 index 0000000000..56eb0fb412 --- /dev/null +++ b/rules/concurrency_executer.go @@ -0,0 +1,89 @@ +package rules + +import ( + "sync" +) + +// ConcurrencyExecuter executes eval rules in parallel in pre-launched goroutines. +type ConcurrencyExecuter interface { + // Execute eval rules in parallel in pre-launched goroutines via queue. + Execute(fn func()) + + // Run worker goroutines. + Run() + + // Stop send signal for stop launched goroutines and waits until all goroutines stop. + Stop() +} + +// ConcurrentRuleEvalExecuter executes eval rules in parallel in pre-launched goroutines, +// if there are no free goroutines, then it is executed on the calling goroutine. +type ConcurrentRuleEvalExecuter struct { + queue chan func() + stop chan struct{} + wg sync.WaitGroup + maxConcurrency int +} + +// NewConcurrentRuleEvalExecuter init new [ConcurrentRuleEvalExecuter]. +func NewConcurrentRuleEvalExecuter(maxConcurrency int) *ConcurrentRuleEvalExecuter { + return &ConcurrentRuleEvalExecuter{ + queue: make(chan func()), + stop: make(chan struct{}), + wg: sync.WaitGroup{}, + maxConcurrency: maxConcurrency, + } +} + +// Execute eval rules in parallel in pre-launched goroutines via queue. +func (e *ConcurrentRuleEvalExecuter) Execute(fn func()) { + select { + case e.queue <- fn: + default: + fn() + } +} + +// Run worker goroutines. +func (e *ConcurrentRuleEvalExecuter) Run() { + if e.isStopped() { + return + } + + e.wg.Add(e.maxConcurrency) + for range e.maxConcurrency { + go e.workerLoop() + } +} + +// Stop send signal for stop launched goroutines and waits until all goroutines stop. +func (e *ConcurrentRuleEvalExecuter) Stop() { + close(e.stop) + e.wg.Wait() +} + +// isStopped check goroutines is stopped. +func (e *ConcurrentRuleEvalExecuter) isStopped() bool { + select { + case <-e.stop: + return true + + default: + return false + } +} + +// workerLoop main workers goroutines. +func (e *ConcurrentRuleEvalExecuter) workerLoop() { + defer e.wg.Done() + + for { + select { + case <-e.stop: + return + + case fn := <-e.queue: + fn() + } + } +} diff --git a/rules/group.go b/rules/group.go index 33e3c655ef..1eb8ee838d 100644 --- a/rules/group.go +++ b/rules/group.go @@ -74,8 +74,11 @@ type Group struct { // defaults to DefaultEvalIterationFunc. evalIterationFunc GroupEvalIterationFunc - // concurrencyController controls the rules evaluation concurrency. - concurrencyController RuleConcurrencyController + // // concurrencyController controls the rules evaluation concurrency. + // concurrencyController RuleConcurrencyController + + // concurrencyExecuter controls the rules evaluation concurrency. + concurrencyExecuter ConcurrencyExecuter } // GroupEvalIterationFunc is used to implement and extend rule group @@ -120,29 +123,30 @@ func NewGroup(o GroupOptions) *Group { evalIterationFunc = DefaultEvalIterationFunc } - concurrencyController := o.Opts.RuleConcurrencyController - if concurrencyController == nil { - concurrencyController = sequentialRuleEvalController{} - } + // concurrencyController := o.Opts.RuleConcurrencyController + // if concurrencyController == nil { + // concurrencyController = sequentialRuleEvalController{} + // } return &Group{ - name: o.Name, - file: o.File, - interval: o.Interval, - queryOffset: o.QueryOffset, - limit: o.Limit, - rules: o.Rules, - shouldRestore: o.ShouldRestore, - opts: o.Opts, - seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), - seriesInCurrentEval: make([]map[string]labels.Labels, len(o.Rules)), - done: make(chan struct{}), - managerDone: o.done, - terminated: make(chan struct{}), - logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), - metrics: metrics, - evalIterationFunc: evalIterationFunc, - concurrencyController: concurrencyController, + name: o.Name, + file: o.File, + interval: o.Interval, + queryOffset: o.QueryOffset, + limit: o.Limit, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + seriesInCurrentEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "file", o.File, "group", o.Name), + metrics: metrics, + evalIterationFunc: evalIterationFunc, + // concurrencyController: concurrencyController, + concurrencyExecuter: o.Opts.ConcurrencyExecuter, } } @@ -495,7 +499,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) { bs = g.opts.Batcher.BatchStorage() ) - if g.concurrencyController.IsConcurrent() { + if g.opts.ConcurrencyExecuter != nil { samplesTotal = g.concurrencyEval(ctx, ts, bs) } else { samplesTotal = g.sequentiallyEval(ctx, ts, g.rules, bs) @@ -1106,17 +1110,14 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba default: } - if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { - wg.Add(1) - - go concurrencyEval(i, rule, func() { - wg.Done() - ctrl.Done(ctx) - }) - sequentiallyRules[i] = nil // placeholder for the series - } else { + if !rule.NoDependencyRules() { sequentiallyRules[i] = rule + continue } + + wg.Add(1) + g.concurrencyExecuter.Execute(func() { concurrencyEval(i, g.rules[i], func() { wg.Done() }) }) + sequentiallyRules[i] = nil // placeholder for the series } wg.Wait() @@ -1124,11 +1125,10 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, bs storage.Ba if err := concurrencyApp.Commit(); err != nil { groupKey := GroupKey(g.File(), g.Name()) for i := range g.rules { - if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, g.rules[i]) { + if g.rules[i].NoDependencyRules() { g.rules[i].SetHealth(HealthBad) g.rules[i].SetLastError(err) g.metrics.EvalFailures.WithLabelValues(groupKey).Inc() - ctrl.Done(ctx) } } diff --git a/rules/manager.go b/rules/manager.go index b5a83df53b..e913a032d6 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -26,7 +26,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "golang.org/x/sync/semaphore" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -117,17 +116,18 @@ type ManagerOptions struct { EngineQueryCtor func(engine promql.QueryEngine, q storage.Queryable) QueryFunc // PP_CHANGES.md: rebuild on cpp Batcher storage.Batcher // PP_CHANGES.md: rebuild on cpp - Logger log.Logger - Registerer prometheus.Registerer - OutageTolerance time.Duration - ForGracePeriod time.Duration - ResendDelay time.Duration - GroupLoader GroupLoader - DefaultRuleQueryOffset func() time.Duration - MaxConcurrentEvals int64 - ConcurrentEvalsEnabled bool - RuleConcurrencyController RuleConcurrencyController - RuleDependencyController RuleDependencyController + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + DefaultRuleQueryOffset func() time.Duration + MaxConcurrentEvals int64 + ConcurrentEvalsEnabled bool + ConcurrencyExecuter ConcurrencyExecuter + RuleDependencyController RuleDependencyController + // RuleConcurrencyController RuleConcurrencyController Metrics *Metrics } @@ -143,12 +143,16 @@ func NewManager(o *ManagerOptions) *Manager { o.GroupLoader = FileLoader{} } - if o.RuleConcurrencyController == nil { - if o.ConcurrentEvalsEnabled { - o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) - } else { - o.RuleConcurrencyController = sequentialRuleEvalController{} - } + // if o.RuleConcurrencyController == nil { + // if o.ConcurrentEvalsEnabled { + // o.RuleConcurrencyController = newRuleConcurrencyController(o.MaxConcurrentEvals) + // } else { + // o.RuleConcurrencyController = sequentialRuleEvalController{} + // } + // } + + if o.ConcurrentEvalsEnabled && o.ConcurrencyExecuter == nil { + o.ConcurrencyExecuter = NewConcurrentRuleEvalExecuter(int(o.MaxConcurrentEvals)) } if o.RuleDependencyController == nil { @@ -169,6 +173,9 @@ func NewManager(o *ManagerOptions) *Manager { // Run starts processing of the rule manager. It is blocking. func (m *Manager) Run() { level.Info(m.logger).Log("msg", "Starting rule manager...") + if m.opts.ConcurrencyExecuter != nil { + m.opts.ConcurrencyExecuter.Run() + } m.start() <-m.done } @@ -192,6 +199,10 @@ func (m *Manager) Stop() { // staleness markers. close(m.done) + if m.opts.ConcurrencyExecuter != nil { + m.opts.ConcurrencyExecuter.Stop() + } + level.Info(m.logger).Log("msg", "Rule manager stopped") } @@ -460,64 +471,65 @@ func (c ruleDependencyController) AnalyseRules(rules []Rule) { } } -// RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. -// Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus -// server with additional query load. Concurrency is controlled globally, not on a per-group basis. -type RuleConcurrencyController interface { - // Allow determines if the given rule is allowed to be evaluated concurrently. - // If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done. - // It is important that both *Group and Rule are not retained and only be used for the duration of the call. - Allow(ctx context.Context, group *Group, rule Rule) bool - - // Done releases a concurrent evaluation slot. - Done(ctx context.Context) - - // IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. - IsConcurrent() bool -} - -// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. -type concurrentRuleEvalController struct { - sema *semaphore.Weighted -} - -func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { - return &concurrentRuleEvalController{ - sema: semaphore.NewWeighted(maxConcurrency), - } -} - -func (c *concurrentRuleEvalController) Allow(ctx context.Context, _ *Group, rule Rule) bool { - // To allow a rule to be executed concurrently, we need 3 conditions: - // 1. The rule must not have any rules that depend on it. - // 2. The rule itself must not depend on any other rules. - // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. - if rule.NoDependencyRules() { - return c.sema.Acquire(ctx, 1) == nil - } - - return false -} - -func (c *concurrentRuleEvalController) Done(_ context.Context) { - c.sema.Release(1) -} - -// IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. -func (*concurrentRuleEvalController) IsConcurrent() bool { - return true -} - -// sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. -type sequentialRuleEvalController struct{} - -func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool { - return false -} - -func (c sequentialRuleEvalController) Done(_ context.Context) {} - -// IsConcurrent returns false if the controller is a sequential controller, true if it is a concurrent controller. -func (c sequentialRuleEvalController) IsConcurrent() bool { - return false -} +// the old mechanism is not used. +// // RuleConcurrencyController controls concurrency for rules that are safe to be evaluated concurrently. +// // Its purpose is to bound the amount of concurrency in rule evaluations to avoid overwhelming the Prometheus +// // server with additional query load. Concurrency is controlled globally, not on a per-group basis. +// type RuleConcurrencyController interface { +// // Allow determines if the given rule is allowed to be evaluated concurrently. +// // If Allow() returns true, then Done() must be called to release the acquired slot and corresponding cleanup is done. +// // It is important that both *Group and Rule are not retained and only be used for the duration of the call. +// Allow(ctx context.Context, group *Group, rule Rule) bool + +// // Done releases a concurrent evaluation slot. +// Done(ctx context.Context) + +// // IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. +// IsConcurrent() bool +// } + +// // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. +// type concurrentRuleEvalController struct { +// sema *semaphore.Weighted +// } + +// func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyController { +// return &concurrentRuleEvalController{ +// sema: semaphore.NewWeighted(maxConcurrency), +// } +// } + +// func (c *concurrentRuleEvalController) Allow(ctx context.Context, _ *Group, rule Rule) bool { +// // To allow a rule to be executed concurrently, we need 3 conditions: +// // 1. The rule must not have any rules that depend on it. +// // 2. The rule itself must not depend on any other rules. +// // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. +// if rule.NoDependencyRules() { +// return c.sema.Acquire(ctx, 1) == nil +// } + +// return false +// } + +// func (c *concurrentRuleEvalController) Done(_ context.Context) { +// c.sema.Release(1) +// } + +// // IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. +// func (*concurrentRuleEvalController) IsConcurrent() bool { +// return true +// } + +// // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. +// type sequentialRuleEvalController struct{} + +// func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) bool { +// return false +// } + +// func (c sequentialRuleEvalController) Done(_ context.Context) {} + +// // IsConcurrent returns false if the controller is a sequential controller, true if it is a concurrent controller. +// func (c sequentialRuleEvalController) IsConcurrent() bool { +// return false +// } diff --git a/rules/manager_test.go b/rules/manager_test.go index 9fd59f6cd6..e16ed7b594 100644 --- a/rules/manager_test.go +++ b/rules/manager_test.go @@ -780,13 +780,11 @@ func TestDeletedRuleMarkedStale(t *testing.T) { rules: []Rule{}, seriesInPreviousEval: []map[string]labels.Labels{}, opts: &ManagerOptions{ - FanoutQueryable: fanoutStorage, // PP_CHANGES.md: rebuild on cpp - EngineQueryCtor: EngineQueryFunc, // PP_CHANGES.md: rebuild on cpp - Batcher: adapter, // PP_CHANGES.md: rebuild on cpp - RuleConcurrencyController: sequentialRuleEvalController{}, + FanoutQueryable: fanoutStorage, // PP_CHANGES.md: rebuild on cpp + EngineQueryCtor: EngineQueryFunc, // PP_CHANGES.md: rebuild on cpp + Batcher: adapter, // PP_CHANGES.md: rebuild on cpp }, - concurrencyController: sequentialRuleEvalController{}, - metrics: NewGroupMetrics(nil), + metrics: NewGroupMetrics(nil), } newGroup.CopyState(oldGroup) @@ -2140,8 +2138,10 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil + // opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) + opts.ConcurrencyExecuter.Run() + defer opts.ConcurrencyExecuter.Stop() groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...) require.Empty(t, errs) @@ -2154,7 +2154,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { group.Eval(ctx, start) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals, maxInflight.Load()) + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. @@ -2190,8 +2190,9 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true opts.MaxConcurrentEvals = 2 - opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) + opts.ConcurrencyExecuter.Run() + defer opts.ConcurrencyExecuter.Stop() groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) require.Empty(t, errs) @@ -2204,7 +2205,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { group.Eval(ctx, start) // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals. - require.EqualValues(t, opts.MaxConcurrentEvals, maxInflight.Load()) + require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load()) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. @@ -2240,8 +2241,9 @@ func TestAsyncRuleEvaluation(t *testing.T) { // Configure concurrency settings. opts.ConcurrentEvalsEnabled = true opts.MaxConcurrentEvals = int64(ruleCount) * 2 - opts.RuleConcurrencyController = nil ruleManager := NewManager(opts) + opts.ConcurrencyExecuter.Run() + defer opts.ConcurrencyExecuter.Stop() groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...) require.Empty(t, errs) @@ -2255,7 +2257,7 @@ func TestAsyncRuleEvaluation(t *testing.T) { group.Eval(ctx, start) // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once. - require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals) + require.LessOrEqual(t, int64(maxInflight.Load())+1, opts.MaxConcurrentEvals) // Some rules should execute concurrently so should complete quicker. require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds()) // Each rule produces one vector. diff --git a/storage/pp_fanout_queryable.go b/storage/pp_fanout_queryable.go index c0c7195a5a..1e13e45920 100644 --- a/storage/pp_fanout_queryable.go +++ b/storage/pp_fanout_queryable.go @@ -1,6 +1,8 @@ package storage -import tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" +import ( + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" +) // // fanoutQueryable @@ -8,16 +10,19 @@ import tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" // fanoutQueryable handles queries against a storage. type fanoutQueryable struct { - primary Queryable + primaries []Queryable secondaries []Queryable } // NewFanoutQueryable init new [fanoutQueryable] as [Queryable]. func NewFanoutQueryable(primary Queryable, secondaries ...Queryable) Queryable { + primaries := make([]Queryable, 0, 2) + primaries = append(primaries, primary) + sq := make([]Queryable, 0, len(secondaries)) for _, q := range secondaries { if f, ok := q.(*fanout); ok { - sq = append(sq, f.primary) + primaries = append(primaries, f.primary) for _, s := range f.secondaries { sq = append(sq, s) } @@ -29,16 +34,29 @@ func NewFanoutQueryable(primary Queryable, secondaries ...Queryable) Queryable { } return &fanoutQueryable{ - primary: primary, + primaries: primaries, secondaries: sq, } } // Querier calls f() with the given parameters. Returns a merged [Querier]. func (fq *fanoutQueryable) Querier(mint, maxt int64) (Querier, error) { - primary, err := fq.primary.Querier(mint, maxt) - if err != nil { - return nil, err + primaries := make([]Querier, 0, len(fq.primaries)) + for _, q := range fq.primaries { + querier, err := q.Querier(mint, maxt) + if err != nil { + // Close already open Queriers, append potential errors to returned error. + errs := tsdb_errors.NewMulti(err) + for _, q := range primaries { + errs.Add(q.Close()) + } + return nil, errs.Err() + } + if _, ok := querier.(noopQuerier); ok { + continue + } + + primaries = append(primaries, querier) } secondaries := make([]Querier, 0, len(fq.secondaries)) @@ -46,7 +64,10 @@ func (fq *fanoutQueryable) Querier(mint, maxt int64) (Querier, error) { querier, err := q.Querier(mint, maxt) if err != nil { // Close already open Queriers, append potential errors to returned error. - errs := tsdb_errors.NewMulti(err, primary.Close()) + errs := tsdb_errors.NewMulti(err) + for _, q := range primaries { + errs.Add(q.Close()) + } for _, q := range secondaries { errs.Add(q.Close()) } @@ -59,5 +80,5 @@ func (fq *fanoutQueryable) Querier(mint, maxt int64) (Querier, error) { secondaries = append(secondaries, querier) } - return NewMergeQuerier([]Querier{primary}, secondaries, ChainedSeriesMerge), nil + return NewMergeQuerierConcurrent(primaries, secondaries, ChainedSeriesMerge), nil } diff --git a/storage/pp_merge.go b/storage/pp_merge.go new file mode 100644 index 0000000000..b08be6f452 --- /dev/null +++ b/storage/pp_merge.go @@ -0,0 +1,33 @@ +package storage + +// NewMergeQuerierConcurrent returns a new Querier that merges results of given primary and secondary queriers. +// Run concurrent select if there are multiple Queriers. +func NewMergeQuerierConcurrent(primaries, secondaries []Querier, mergeFn VerticalSeriesMergeFunc) Querier { + primaries = filterQueriers(primaries) + secondaries = filterQueriers(secondaries) + + switch { + case len(primaries) == 0 && len(secondaries) == 0: + return noopQuerier{} + case len(primaries) == 1 && len(secondaries) == 0: + return primaries[0] + case len(primaries) == 0 && len(secondaries) == 1: + return &querierAdapter{newSecondaryQuerierFrom(secondaries[0])} + } + + queriers := make([]genericQuerier, 0, len(primaries)+len(secondaries)) + for _, q := range primaries { + queriers = append(queriers, newGenericQuerierFrom(q)) + } + for _, q := range secondaries { + queriers = append(queriers, newSecondaryQuerierFrom(q)) + } + + concurrentSelect := len(secondaries) > 0 || len(primaries) > 1 + + return &querierAdapter{&mergeGenericQuerier{ + mergeFn: (&seriesMergerAdapter{VerticalSeriesMergeFunc: mergeFn}).Merge, + queriers: queriers, + concurrentSelect: concurrentSelect, + }} +} diff --git a/storage/secondary.go b/storage/secondary.go index 1cf8024b65..1e161afaa8 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -15,6 +15,8 @@ package storage import ( "context" + "fmt" + "runtime/debug" "sync" "github.com/prometheus/prometheus/model/labels" @@ -67,7 +69,11 @@ func (s *secondaryQuerier) LabelNames(ctx context.Context, hints *LabelHints, ma func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if s.done { - panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done") + debug.PrintStack() + panic(fmt.Sprintf( + "secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done: %v", + matchers, + )) } s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(ctx, sortSeries, hints, matchers...)) From c8516d796c7f0534ec192d31b267486cd13522aa Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Fri, 14 Nov 2025 16:43:12 +0000 Subject: [PATCH 2/3] some fix --- pp/go/cppbridge/head.go | 10 ++++------ storage/pp_fanout_queryable.go | 8 ++++---- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/pp/go/cppbridge/head.go b/pp/go/cppbridge/head.go index 02453faf7d..2c8ebd7931 100644 --- a/pp/go/cppbridge/head.go +++ b/pp/go/cppbridge/head.go @@ -56,17 +56,15 @@ type Sample struct { // HeadDataStorage is Go wrapper around series_data::Data_storage. type HeadDataStorage struct { - dataStorage uintptr - gcDestroyDetector *uint64 - timeInterval atomic.Pointer[TimeInterval] + dataStorage uintptr + timeInterval atomic.Pointer[TimeInterval] } // NewHeadDataStorage - constructor. func NewHeadDataStorage() *HeadDataStorage { ds := &HeadDataStorage{ - dataStorage: seriesDataDataStorageCtor(), - gcDestroyDetector: &gcDestroyDetector, - timeInterval: atomic.Pointer[TimeInterval]{}, + dataStorage: seriesDataDataStorageCtor(), + timeInterval: atomic.Pointer[TimeInterval]{}, } ds.timeInterval.Store(newInvalidTimeIntervalPtr()) diff --git a/storage/pp_fanout_queryable.go b/storage/pp_fanout_queryable.go index 1e13e45920..39c47b53d1 100644 --- a/storage/pp_fanout_queryable.go +++ b/storage/pp_fanout_queryable.go @@ -42,8 +42,8 @@ func NewFanoutQueryable(primary Queryable, secondaries ...Queryable) Queryable { // Querier calls f() with the given parameters. Returns a merged [Querier]. func (fq *fanoutQueryable) Querier(mint, maxt int64) (Querier, error) { primaries := make([]Querier, 0, len(fq.primaries)) - for _, q := range fq.primaries { - querier, err := q.Querier(mint, maxt) + for _, qe := range fq.primaries { + querier, err := qe.Querier(mint, maxt) if err != nil { // Close already open Queriers, append potential errors to returned error. errs := tsdb_errors.NewMulti(err) @@ -60,8 +60,8 @@ func (fq *fanoutQueryable) Querier(mint, maxt int64) (Querier, error) { } secondaries := make([]Querier, 0, len(fq.secondaries)) - for _, q := range fq.secondaries { - querier, err := q.Querier(mint, maxt) + for _, qe := range fq.secondaries { + querier, err := qe.Querier(mint, maxt) if err != nil { // Close already open Queriers, append potential errors to returned error. errs := tsdb_errors.NewMulti(err) From 0da434268e8dff406990f2cf94f9583cc72614ff Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Mon, 17 Nov 2025 06:21:54 +0000 Subject: [PATCH 3/3] revert --- storage/secondary.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/storage/secondary.go b/storage/secondary.go index 1e161afaa8..1cf8024b65 100644 --- a/storage/secondary.go +++ b/storage/secondary.go @@ -15,8 +15,6 @@ package storage import ( "context" - "fmt" - "runtime/debug" "sync" "github.com/prometheus/prometheus/model/labels" @@ -69,11 +67,7 @@ func (s *secondaryQuerier) LabelNames(ctx context.Context, hints *LabelHints, ma func (s *secondaryQuerier) Select(ctx context.Context, sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) genericSeriesSet { if s.done { - debug.PrintStack() - panic(fmt.Sprintf( - "secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done: %v", - matchers, - )) + panic("secondaryQuerier: Select invoked after first Next of any returned SeriesSet was done") } s.asyncSets = append(s.asyncSets, s.genericQuerier.Select(ctx, sortSeries, hints, matchers...))