Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions discovery/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ func Name(n string) func(*Manager) {
}
}

// SkipInitialWait sets the name of the manager. This is used in serverless flavours of OTel's prometheusreceiver
// which is sensitive to startup latencies.
func SkipInitialWait() func(*Manager) {
return func(m *Manager) {
m.mtx.Lock()
defer m.mtx.Unlock()
m.skipStartupWait = true
}
}

// HTTPClientOptions sets the list of HTTP client options to expose to
// Discoverers. It is up to Discoverers to choose to use the options provided.
func HTTPClientOptions(opts ...config.HTTPClientOption) func(*Manager) {
Expand Down Expand Up @@ -165,6 +175,11 @@ type Manager struct {
// should only be modified in unit tests.
updatert time.Duration

// skipStartupWait allows the discovery manager to skip the initial wait before sending updates
// to the channel. This is used in serverless flavours of OTel's prometheusreceiver
// which is sensitive to startup latencies.
skipStartupWait bool

// The triggerSend channel signals to the Manager that new updates have been received from providers.
triggerSend chan struct{}

Expand Down Expand Up @@ -344,6 +359,33 @@ func (m *Manager) sender() {
ticker := time.NewTicker(m.updatert)
defer ticker.Stop()

// Send the targets downstream as soon as you see them if skipStartupWait is
// set. If discovery receiver's channel is too busy, fall back to the
// regular loop.
if m.skipStartupWait {
select {
case <-m.triggerSend:
sentUpdates.WithLabelValues(m.name).Inc()
select {
case m.syncCh <- m.allGroups():
case <-ticker.C:
delayedUpdates.WithLabelValues(m.name).Inc()
level.Debug(m.logger).Log("msg", "Discovery receiver's channel was full so will retry the next cycle")
select {
case m.triggerSend <- struct{}{}:
default:
}
case <-m.ctx.Done():
return
}
case <-m.ctx.Done():
return
}

// We restart the ticker to ensure that no two updates are less than updatert apart.
ticker.Reset(m.updatert)
}

for {
select {
case <-m.ctx.Done():
Expand Down
25 changes: 25 additions & 0 deletions discovery/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,31 @@ func pk(provider, setName string, n int) poolKey {
}
}

func TestTargetSetTargetGroupsPresentOnStartup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
discoveryManager := NewManager(ctx, log.NewNopLogger(), SkipInitialWait())

// Set the updatert to a super long time so we can verify that the skip worked correctly.
discoveryManager.updatert = 100 * time.Hour
go discoveryManager.Run()

c := map[string]Configs{
"prometheus": {
staticConfig("foo:9090"),
},
}
discoveryManager.ApplyConfig(c)

syncedTargets := <-discoveryManager.SyncCh()
require.Equal(t, 1, len(syncedTargets))
verifySyncedPresence(t, syncedTargets, "prometheus", "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(syncedTargets["prometheus"]))
p := pk("static", "prometheus", 0)
verifyPresence(t, discoveryManager.targets, p, "{__address__=\"foo:9090\"}", true)
require.Equal(t, 1, len(discoveryManager.targets))
}

func TestTargetSetTargetGroupsPresentOnConfigReload(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
19 changes: 18 additions & 1 deletion scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ type Options struct {
EnableProtobufNegotiation bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable discovering targets immediately on start up as opposed
// to waiting for the interval defined in DiscoveryReloadInterval before
// initializing the scrape pools. Disabled by default. Useful for serverless
// flavors of OpenTelemetry contrib's prometheusreceiver where we're
// sensitive to start up delays.
DiscoveryReloadOnStartup bool

// Optional HTTP client options to use when scraping.
HTTPClientOptions []config_util.HTTPClientOption
Expand Down Expand Up @@ -198,8 +204,17 @@ func (m *Manager) reloader() {
reloadIntervalDuration = model.Duration(5 * time.Second)
}

ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
// Skip the initial reload interval wait for the first reload.
if m.opts.DiscoveryReloadOnStartup {
select {
case <-m.triggerReload:
m.reload()
case <-m.graceShut:
return
}
}

ticker := time.NewTicker(time.Duration(reloadIntervalDuration))
defer ticker.Stop()

for {
Expand Down Expand Up @@ -289,6 +304,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) {

var wg sync.WaitGroup
for _, p := range m.scrapePools {
p.mtx.Lock()
for _, l := range p.loops {
l := l
wg.Add(1)
Expand All @@ -297,6 +313,7 @@ func (m *Manager) StopAfterScrapeAttempt(minScrapeTime time.Time) {
wg.Done()
}()
}
p.mtx.Unlock()
}
wg.Wait()

Expand Down
38 changes: 38 additions & 0 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,44 @@ func TestManagerTargetsUpdates(t *testing.T) {
}
}

func TestManagerSkipInitialWait(t *testing.T) {
opts := Options{DiscoveryReloadOnStartup: true}
m := NewManager(&opts, nil, nil)

ts := make(chan map[string][]*targetgroup.Group, 1)
go m.Run(ts)
defer m.Stop()

tgSent := make(map[string][]*targetgroup.Group)
tgSent["test"] = []*targetgroup.Group{
{
Source: "test_source",
},
}

select {
case ts <- tgSent:
case <-time.After(10 * time.Millisecond):
t.Error("Scrape manager's channel remained blocked after the set threshold.")
}

// Give some time for the reloader to have picked this up.
time.Sleep(2 * time.Second)

m.mtxScrape.Lock()
tsetActual := m.targetSets
m.mtxScrape.Unlock()

// Make sure all updates have been received.
require.Equal(t, tgSent, tsetActual)

select {
case <-m.triggerReload:
t.Error("Reload should've already happened")
default:
}
}

func TestSetOffsetSeed(t *testing.T) {
getConfig := func(prometheus string) *config.Config {
cfgText := `
Expand Down