Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions pkg/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ import (
)

var (
gcmExportCalledWhileDisabled = prometheus.NewCounter(prometheus.CounterOpts{
Name: "gcm_export_called_while_disabled_total",
Help: "Number of calls to export while metric exporting was disabled.",
})
samplesExported = prometheus.NewCounter(prometheus.CounterOpts{
Name: "gcm_export_samples_exported_total",
Help: "Number of samples exported at scrape time.",
Expand Down Expand Up @@ -132,6 +136,9 @@ type Exporter struct {
// be processed.
nextc chan struct{}

// Channel for signaling to exit the exporter. Used to indicate
// data is done exporting during unit tests.
exitc chan struct{}
// The external labels may be updated asynchronously by configuration changes
// and must be locked with mtx.
mtx sync.RWMutex
Expand Down Expand Up @@ -162,7 +169,11 @@ const (
// Time after an accumulating batch is flushed to GCM. This avoids data being
// held indefinititely if not enough new data flows in to fill up the batch.
batchDelayMax = 50 * time.Millisecond

// Time after context is cancelled that we use to flush the remaining buffered data.
// This avoids data loss on shutdown.
cancelTimeout = 15 * time.Second
// Time after the final shards are drained before the exporter is closed on shutdown.
flushTimeout = 100 * time.Millisecond
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 100ms making any difference?

// Prefix for GCM metric.
MetricTypePrefix = "prometheus.googleapis.com"
)
Expand Down Expand Up @@ -396,6 +407,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts
pendingRequests,
projectsPerBatch,
samplesPerRPCBatch,
gcmExportCalledWhileDisabled,
)
}

Expand All @@ -420,6 +432,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts
externalLabels: createLabelSet(&config.Config{}, &opts),
newMetricClient: defaultNewMetricClient,
nextc: make(chan struct{}, 1),
exitc: make(chan struct{}, 1),
shards: make([]*shard, opts.Efficiency.ShardCount),
warnedUntypedMetrics: map[string]struct{}{},
lease: lease,
Expand Down Expand Up @@ -548,15 +561,15 @@ func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) {

// Export enqueues the samples and exemplars to be written to Cloud Monitoring.
func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) {
if e.opts.Disable {
gcmExportCalledWhileDisabled.Inc()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's unrelated to the main PR goal, I see Max did this, let's at least update description.

return
}
// Wether we're sending data or not, add batchsize of samples exported by
// Prometheus from appender commit.
batchSize := len(batch)
samplesExported.Add(float64(batchSize))

if e.opts.Disable {
return
}

metadata = e.wrapMetadata(metadata)

e.mtx.Lock()
Expand Down Expand Up @@ -742,6 +755,11 @@ func (e *Exporter) Run() error {
send := func() {
e.mtx.RLock()
opts := e.opts
if e.metricClient == nil {
// Flush timeout reached, runner is shut down.
e.mtx.RUnlock()
return
}
sendFunc := e.metricClient.CreateTimeSeries
e.mtx.RUnlock()

Expand All @@ -765,13 +783,61 @@ func (e *Exporter) Run() error {
curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize)
}

// Try to drain the remaining data before exiting or the time limit (15 seconds) expires.
// A sleep timer is added after draining the shards to ensure it has time to be sent.
drainShardsBeforeExiting := func() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not a must, but it might be easier to iterate on if we move send and drain to separate Exporter private methods 🤔

//nolint:errcheck
level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.")
exitTimer := time.NewTimer(cancelTimeout)
drained := make(chan struct{}, 1)
go func() {
for {
totalRemaining := 0
pending := false
for _, shard := range e.shards {
_, remaining := shard.fill(curBatch)
totalRemaining += remaining
shard.mtx.Lock()
pending = pending || shard.pending
shard.mtx.Unlock()
if !curBatch.empty() {
send()
}
}
if totalRemaining == 0 && !pending {
// NOTE(ridwanmsharif): the sending of the batches happen asyncronously
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, can we improve this? And get that result state from send (or create separate send?)

// and we only wait for a fixed amount of time after the final batch is sent
// before shutting down the exporter.
time.Sleep(flushTimeout)
drained <- struct{}{}
}
}
}()
for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we don't do another goroutine and we do all in this for loop? Send is async anyway?

select {
case <-exitTimer.C:
//nolint:errcheck
level.Info(e.logger).Log("msg", "Exiting Exporter - Data wasn't sent within the timeout limit.")
samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.")
return
case <-drained:
return
}
}
}

for {
select {
// NOTE(freinartz): we will terminate once context is cancelled and not flush remaining
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update this commentary, it's incorrect with the new logic 🤗

// buffered data. In-flight requests will be aborted as well.
// This is fine once we persist data submitted via Export() but for now there may be some
// data loss on shutdown.
case <-e.ctx.Done():
// on termination, try to drain the remaining shards within the CancelTimeout.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// on termination, try to drain the remaining shards within the CancelTimeout.
// On termination, try to drain the remaining shards within the CancelTimeout.

// This is done to prevent data loss during a shutdown.
drainShardsBeforeExiting()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this works? All sends normally take e.ctx context, so they will be cancelled? I assume here you (intentionally?) accept that and drain the buffer with new sends with a separate context.

Would it be cleaner to have a custom, separate context from the beginning? 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact we always use e.ctx even in drain if I look correctly, are we use this code works? 🤔

// This channel is used for unit test case.
e.exitc <- struct{}{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we don't have testing specific code in critical, production flow. However, IF we make sure sending propagates some results to us back, we could have a nice production logic that emits log line telling us all was flushed successfully, can we do this?

return nil
// This is activated for each new sample that arrives
case <-e.nextc:
Expand Down Expand Up @@ -807,7 +873,8 @@ func (e *Exporter) close() {
e.mtx.Lock()
defer e.mtx.Unlock()
if err := e.metricClient.Close(); err != nil {
_ = e.logger.Log("msg", "error closing metric client", "err", err)
//nolint:errcheck
e.logger.Log("msg", "error closing metric client", "err", err)
}
e.metricClient = nil
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,3 +526,57 @@ func TestDisabledExporter(t *testing.T) {
// Allow samples to be sent to the void. If we don't panic, we're good.
time.Sleep(batchDelayMax)
}

func TestExporter_shutdown(t *testing.T) {
eCtx, eCtxCancel := context.WithCancel(context.Background())

exporterOpts := ExporterOpts{DisableAuth: true}
exporterOpts.DefaultUnsetFields()
e, err := New(eCtx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, exporterOpts, NopLease())
if err != nil {
t.Fatalf("Creating Exporter failed: %s", err)
}
metricServer := testMetricService{}
e.metricClient = &metricServer

e.SetLabelsByIDFunc(func(storage.SeriesRef) labels.Labels {
return labels.FromStrings("project_id", "test", "location", "test")
})

//nolint:errcheck
go e.Run()

// Fill a single shard with samples.
wantSamples := 50
for i := range wantSamples {
e.Export(nil, []record.RefSample{
{Ref: 1, T: int64(i), V: float64(i)},
}, nil)
}

// Shut down the exporter.
eCtxCancel()

// Wait for exporter to finish flushing shards.
<-e.exitc

// These samples will be rejected since the exporter has been cancelled.
for i := range 10 {
e.Export(nil, []record.RefSample{
{Ref: 1, T: int64(i), V: float64(i)},
}, nil)
}

ctxTimeout, cancel := context.WithTimeout(context.Background(), 2*cancelTimeout)
defer cancel()

pollErr := wait.PollUntilContextCancel(ctxTimeout, time.Second, false, func(_ context.Context) (bool, error) {
return len(metricServer.samples) == wantSamples, nil
})
if pollErr != nil {
if wait.Interrupted(pollErr) && err != nil {
pollErr = err
}
t.Fatalf("did not get samples: %s", pollErr)
}
}