From dcdf8b5677950e070dd391b8b051549a1db1a58e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 22 Dec 2025 20:32:04 +0900 Subject: [PATCH] ruler: Refactor query execution and tenant factory logic Signed-off-by: SungJin1212 --- pkg/ruler/compat.go | 143 ++++++++++++++++++++++++++++---------------- 1 file changed, 90 insertions(+), 53 deletions(-) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 55b458a6f0a..74aec5a61f0 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -166,10 +166,51 @@ type RulesLimits interface { RulerExternalLabels(userID string) labels.Labels } -// EngineQueryFunc returns a new engine query function validating max queryLength. -// Modified from Prometheus rules.EngineQueryFunc -// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189. +type QueryExecutor func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) + func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { + var executor QueryExecutor + + if frontendClient != nil { + // query to query frontend + executor = frontendClient.InstantQuery + } else { + // query to engine + executor = func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + return executeQuery(ctx, engine, q, qs, t) + } + } + + return wrapWithMiddleware(executor, overrides, userID, lookbackDelta) +} + +func executeQuery(ctx context.Context, engine promql.QueryEngine, q storage.Queryable, qs string, t time.Time) (promql.Vector, error) { + qry, err := engine.NewInstantQuery(ctx, q, nil, qs, t) + if err != nil { + return nil, err + } + defer qry.Close() + + res := qry.Exec(ctx) + if res.Err != nil { + return nil, res.Err + } + + switch v := res.Value.(type) { + case promql.Vector: + return v, nil + case promql.Scalar: + return promql.Vector{promql.Sample{ + T: v.T, + F: v.V, + Metric: labels.Labels{}, + }}, nil + default: + return nil, errors.New("rule result is not a vector or scalar") + } +} + +func wrapWithMiddleware(next QueryExecutor, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { // Enforce the max query length. maxQueryLength := overrides.MaxQueryLength(userID) @@ -192,35 +233,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, } ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler) - if frontendClient != nil { - v, err := frontendClient.InstantQuery(ctx, qs, t) - if err != nil { - return nil, err - } - - return v, nil - } else { - q, err := engine.NewInstantQuery(ctx, q, nil, qs, t) - if err != nil { - return nil, err - } - res := q.Exec(ctx) - if res.Err != nil { - return nil, res.Err - } - switch v := res.Value.(type) { - case promql.Vector: - return v, nil - case promql.Scalar: - return promql.Vector{promql.Sample{ - T: v.T, - F: v.V, - Metric: labels.Labels{}, - }}, nil - default: - return nil, errors.New("rule result is not a vector or scalar") - } - } + return next(ctx, qs, t) } } @@ -337,36 +350,25 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi // and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors. // Errors from PromQL are always "user" errors. q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors) - return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) { - var client *frontendClient - failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID) - totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID) - totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID) - failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID) - - if cfg.FrontendAddress != "" { - c, err := frontendPool.GetClientFor(cfg.FrontendAddress) - if err != nil { - return nil, err - } - client = c.(*frontendClient) + qfeClient, err := resolveFrontendClient(cfg.FrontendAddress, frontendPool) + if err != nil { + return nil, err } - var queryFunc rules.QueryFunc - engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) - metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries) - if cfg.EnableQueryStats { - queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger) - } else { - queryFunc = metricsQueryFunc + + if qfeClient == nil && engine == nil { + return nil, fmt.Errorf("neither engine nor frontend client is configured for user %s", userID) } + queryFunc := buildQueryFunc(engine, qfeClient, q, overrides, userID, cfg, evalMetrics, logger) // We let the Prometheus rules manager control the context so that there is a chance // for graceful shutdown of rules that are still in execution even in case the cortex context is canceled. prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID) return rules.NewManager(&rules.ManagerOptions{ - Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), + Appendable: NewPusherAppendable(p, userID, overrides, + evalMetrics.TotalWritesVec.WithLabelValues(userID), + evalMetrics.FailedWritesVec.WithLabelValues(userID)), Queryable: q, QueryFunc: queryFunc, Context: prometheusContext, @@ -387,6 +389,41 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi } } +func resolveFrontendClient(addr string, pool *client.Pool) (*frontendClient, error) { + if addr == "" { + return nil, nil + } + c, err := pool.GetClientFor(addr) + if err != nil { + return nil, err + } + return c.(*frontendClient), nil +} + +func buildQueryFunc( + engine promql.QueryEngine, + client *frontendClient, + q storage.Queryable, + overrides RulesLimits, + userID string, + cfg Config, + metrics *RuleEvalMetrics, + logger log.Logger, +) rules.QueryFunc { + baseQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta) + + // apply metric middleware + totalQueries := metrics.TotalQueriesVec.WithLabelValues(userID) + failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID) + metricsFunc := MetricsQueryFunc(baseQueryFunc, totalQueries, failedQueries) + + // apply statistic middleware + if cfg.EnableQueryStats { + return RecordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger) + } + return metricsFunc +} + type QueryableError struct { err error }