diff --git a/discovery/manager.go b/discovery/manager.go index 8b304a0faf..6d10717712 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -134,6 +134,16 @@ func Name(n string) func(*Manager) { } } +// SkipInitialWait sets the name of the manager. This is used in serverless flavours of OTel's prometheusreceiver +// which is sensitive to startup latencies. +func SkipInitialWait() func(*Manager) { + return func(m *Manager) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.skipStartupWait = true + } +} + // HTTPClientOptions sets the list of HTTP client options to expose to // Discoverers. It is up to Discoverers to choose to use the options provided. func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) { @@ -165,6 +175,11 @@ type Manager struct { // should only be modified in unit tests. updatert time.Duration + // skipStartupWait allows the discovery manager to skip the initial wait before sending updates + // to the channel. This is used in serverless flavours of OTel's prometheusreceiver + // which is sensitive to startup latencies. + skipStartupWait bool + // The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chan struct{} @@ -344,6 +359,33 @@ func (m *Manager) sender() { ticker := time.NewTicker(m.updatert) defer ticker.Stop() + // Send the targets downstream as soon as you see them if skipStartupWait is + // set. If discovery receiver's channel is too busy, fall back to the + // regular loop. + if m.skipStartupWait { + select { + case <-m.triggerSend: + sentUpdates.WithLabelValues(m.name).Inc() + select { + case m.syncCh <- m.allGroups(): + case <-ticker.C: + delayedUpdates.WithLabelValues(m.name).Inc() + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") + select { + case m.triggerSend <- struct{}{}: + default: + } + case <-m.ctx.Done(): + return + } + case <-m.ctx.Done(): + return + } + + // We restart the ticker to ensure that no two updates are less than updatert apart. + ticker.Reset(m.updatert) + } + for { select { case <-m.ctx.Done(): diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 5371608112..3c843a9cee 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger(), SkipInitialWait()) + + // Set the updatert to a super long time so we can verify that the skip worked correctly. + discoveryManager.updatert = 100 * time.Hour + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/scrape/manager.go b/scrape/manager.go index c2b434308e..9b22ed039c 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -137,6 +137,12 @@ type Options struct { EnableProtobufNegotiation bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption @@ -198,8 +204,17 @@ func (m *Manager) reloader() { reloadIntervalDuration = model.Duration(5 * time.Second) } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) + // Skip the initial reload interval wait for the first reload. + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.triggerReload: + m.reload() + case <-m.graceShut: + return + } + } + ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) defer ticker.Stop() for { @@ -289,6 +304,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { var wg sync.WaitGroup for _, p := range m.scrapePools { + p.mtx.Lock() for _, l := range p.loops { l := l wg.Add(1) @@ -297,6 +313,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) { wg.Done() }() } + p.mtx.Unlock() } wg.Wait() diff --git a/scrape/manager_test.go b/scrape/manager_test.go index e49d5b4833..46c127f337 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -601,6 +601,44 @@ func TestManagerTargetsUpdates(t *testing.T) { } } +func TestManagerSkipInitialWait(t *testing.T) { + opts := Options{DiscoveryReloadOnStartup: true} + m := NewManager(&opts, nil, nil) + + ts := make(chan map[string][]*targetgroup.Group, 1) + go m.Run(ts) + defer m.Stop() + + tgSent := make(map[string][]*targetgroup.Group) + tgSent["test"] = []*targetgroup.Group{ + { + Source: "test_source", + }, + } + + select { + case ts <- tgSent: + case <-time.After(10 * time.Millisecond): + t.Error("Scrape manager's channel remained blocked after the set threshold.") + } + + // Give some time for the reloader to have picked this up. + time.Sleep(2 * time.Second) + + m.mtxScrape.Lock() + tsetActual := m.targetSets + m.mtxScrape.Unlock() + + // Make sure all updates have been received. + require.Equal(t, tgSent, tsetActual) + + select { + case <-m.triggerReload: + t.Error("Reload should've already happened") + default: + } +} + func TestSetOffsetSeed(t *testing.T) { getConfig := func(prometheus string) *config.Config { cfgText := `