From 44e7d091ea600e48ed47341be6161c8212fb75eb Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Fri, 17 Nov 2023 20:02:44 +0000 Subject: [PATCH 1/2] scrape: add option to Manager to skip initial discovery reload This config option is quite useful on serverless environments where we are sensitive to the start up latencies of the scraper. The serverless instance might only last for a few seconds and may not be able to afford the minimum 5s reload interval before the scrape pools are created. Signed-off-by: Ridwan Sharif --- scrape/manager.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/scrape/manager.go b/scrape/manager.go index c2b434308e..86874bbd82 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -137,6 +137,12 @@ type Options struct { EnableProtobufNegotiation bool // Option to increase the interval used by scrape manager to throttle target groups updates. DiscoveryReloadInterval model.Duration + // Option to enable discovering targets immediately on start up as opposed + // to waiting for the interval defined in DiscoveryReloadInterval before + // initializing the scrape pools. Disabled by default. Useful for serverless + // flavors of OpenTelemetry contrib's prometheusreceiver where we're + // sensitive to start up delays. + DiscoveryReloadOnStartup bool // Optional HTTP client options to use when scraping. HTTPClientOptions []config_util.HTTPClientOption @@ -198,8 +204,17 @@ func (m *Manager) reloader() { reloadIntervalDuration = model.Duration(5 * time.Second) } - ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) + // Skip the initial reload interval wait for the first reload. + if m.opts.DiscoveryReloadOnStartup { + select { + case <-m.triggerReload: + m.reload() + case <-m.graceShut: + return + } + } + ticker := time.NewTicker(time.Duration(reloadIntervalDuration)) defer ticker.Stop() for { From e206020a3ea6951dfe4f3fd97a4d118b66028328 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Sat, 18 Nov 2023 01:27:28 +0000 Subject: [PATCH 2/2] discovery: add optional skip ability for serverless environments This change adds an option that will allow users to skip the initial wait before sending target sets to the scrape manager. This is particularly useful in environments where the startup latency is required to be low just as in serverless deployments. Signed-off-by: Ridwan Sharif --- discovery/manager.go | 42 +++++++++++++++++++++++++++++++++++++++ discovery/manager_test.go | 25 +++++++++++++++++++++++ scrape/manager_test.go | 38 +++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+) diff --git a/discovery/manager.go b/discovery/manager.go index 8b304a0faf..6d10717712 100644 --- a/discovery/manager.go +++ b/discovery/manager.go @@ -134,6 +134,16 @@ func Name(n string) func(*Manager) { } } +// SkipInitialWait sets the name of the manager. This is used in serverless flavours of OTel's prometheusreceiver +// which is sensitive to startup latencies. +func SkipInitialWait() func(*Manager) { + return func(m *Manager) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.skipStartupWait = true + } +} + // HTTPClientOptions sets the list of HTTP client options to expose to // Discoverers. It is up to Discoverers to choose to use the options provided. func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) { @@ -165,6 +175,11 @@ type Manager struct { // should only be modified in unit tests. updatert time.Duration + // skipStartupWait allows the discovery manager to skip the initial wait before sending updates + // to the channel. This is used in serverless flavours of OTel's prometheusreceiver + // which is sensitive to startup latencies. + skipStartupWait bool + // The triggerSend channel signals to the Manager that new updates have been received from providers. triggerSend chan struct{} @@ -344,6 +359,33 @@ func (m *Manager) sender() { ticker := time.NewTicker(m.updatert) defer ticker.Stop() + // Send the targets downstream as soon as you see them if skipStartupWait is + // set. If discovery receiver's channel is too busy, fall back to the + // regular loop. + if m.skipStartupWait { + select { + case <-m.triggerSend: + sentUpdates.WithLabelValues(m.name).Inc() + select { + case m.syncCh <- m.allGroups(): + case <-ticker.C: + delayedUpdates.WithLabelValues(m.name).Inc() + level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle") + select { + case m.triggerSend <- struct{}{}: + default: + } + case <-m.ctx.Done(): + return + } + case <-m.ctx.Done(): + return + } + + // We restart the ticker to ensure that no two updates are less than updatert apart. + ticker.Reset(m.updatert) + } + for { select { case <-m.ctx.Done(): diff --git a/discovery/manager_test.go b/discovery/manager_test.go index 5371608112..3c843a9cee 100644 --- a/discovery/manager_test.go +++ b/discovery/manager_test.go @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey { } } +func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + discoveryManager := NewManager(ctx, log.NewNopLogger(), SkipInitialWait()) + + // Set the updatert to a super long time so we can verify that the skip worked correctly. + discoveryManager.updatert = 100 * time.Hour + go discoveryManager.Run() + + c := map[string]Configs{ + "prometheus": { + staticConfig("foo:9090"), + }, + } + discoveryManager.ApplyConfig(c) + + syncedTargets := <-discoveryManager.SyncCh() + require.Equal(t, 1, len(syncedTargets)) + verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(syncedTargets["prometheus"])) + p := pk("static", "prometheus", 0) + verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true) + require.Equal(t, 1, len(discoveryManager.targets)) +} + func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/scrape/manager_test.go b/scrape/manager_test.go index e49d5b4833..46c127f337 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -601,6 +601,44 @@ func TestManagerTargetsUpdates(t *testing.T) { } } +func TestManagerSkipInitialWait(t *testing.T) { + opts := Options{DiscoveryReloadOnStartup: true} + m := NewManager(&opts, nil, nil) + + ts := make(chan map[string][]*targetgroup.Group, 1) + go m.Run(ts) + defer m.Stop() + + tgSent := make(map[string][]*targetgroup.Group) + tgSent["test"] = []*targetgroup.Group{ + { + Source: "test_source", + }, + } + + select { + case ts <- tgSent: + case <-time.After(10 * time.Millisecond): + t.Error("Scrape manager's channel remained blocked after the set threshold.") + } + + // Give some time for the reloader to have picked this up. + time.Sleep(2 * time.Second) + + m.mtxScrape.Lock() + tsetActual := m.targetSets + m.mtxScrape.Unlock() + + // Make sure all updates have been received. + require.Equal(t, tgSent, tsetActual) + + select { + case <-m.triggerReload: + t.Error("Reload should've already happened") + default: + } +} + func TestSetOffsetSeed(t *testing.T) { getConfig := func(prometheus string) *config.Config { cfgText := `