diff --git a/pkg/export/export.go b/pkg/export/export.go index b068044795..1b433af849 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -50,6 +50,10 @@ import ( ) var ( + gcmExportCalledWhileDisabled = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gcm_export_called_while_disabled_total", + Help: "Number of calls to export while metric exporting was disabled.", + }) samplesExported = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_samples_exported_total", Help: "Number of samples exported at scrape time.", @@ -132,6 +136,9 @@ type Exporter struct { // be processed. nextc chan struct{} + // Channel for signaling to exit the exporter. Used to indicate + // data is done exporting during unit tests. + exitc chan struct{} // The external labels may be updated asynchronously by configuration changes // and must be locked with mtx. mtx sync.RWMutex @@ -162,7 +169,11 @@ const ( // Time after an accumulating batch is flushed to GCM. This avoids data being // held indefinititely if not enough new data flows in to fill up the batch. batchDelayMax = 50 * time.Millisecond - + // Time after context is cancelled that we use to flush the remaining buffered data. + // This avoids data loss on shutdown. + cancelTimeout = 15 * time.Second + // Time after the final shards are drained before the exporter is closed on shutdown. + flushTimeout = 100 * time.Millisecond // Prefix for GCM metric. MetricTypePrefix = "prometheus.googleapis.com" ) @@ -396,6 +407,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts pendingRequests, projectsPerBatch, samplesPerRPCBatch, + gcmExportCalledWhileDisabled, ) } @@ -420,6 +432,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts externalLabels: createLabelSet(&config.Config{}, &opts), newMetricClient: defaultNewMetricClient, nextc: make(chan struct{}, 1), + exitc: make(chan struct{}, 1), shards: make([]*shard, opts.Efficiency.ShardCount), warnedUntypedMetrics: map[string]struct{}{}, lease: lease, @@ -548,15 +561,15 @@ func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) { // Export enqueues the samples and exemplars to be written to Cloud Monitoring. func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) { + if e.opts.Disable { + gcmExportCalledWhileDisabled.Inc() + return + } // Wether we're sending data or not, add batchsize of samples exported by // Prometheus from appender commit. batchSize := len(batch) samplesExported.Add(float64(batchSize)) - if e.opts.Disable { - return - } - metadata = e.wrapMetadata(metadata) e.mtx.Lock() @@ -742,6 +755,11 @@ func (e *Exporter) Run() error { send := func() { e.mtx.RLock() opts := e.opts + if e.metricClient == nil { + // Flush timeout reached, runner is shut down. + e.mtx.RUnlock() + return + } sendFunc := e.metricClient.CreateTimeSeries e.mtx.RUnlock() @@ -765,6 +783,49 @@ func (e *Exporter) Run() error { curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) } + // Try to drain the remaining data before exiting or the time limit (15 seconds) expires. + // A sleep timer is added after draining the shards to ensure it has time to be sent. + drainShardsBeforeExiting := func() { + //nolint:errcheck + level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.") + exitTimer := time.NewTimer(cancelTimeout) + drained := make(chan struct{}, 1) + go func() { + for { + totalRemaining := 0 + pending := false + for _, shard := range e.shards { + _, remaining := shard.fill(curBatch) + totalRemaining += remaining + shard.mtx.Lock() + pending = pending || shard.pending + shard.mtx.Unlock() + if !curBatch.empty() { + send() + } + } + if totalRemaining == 0 && !pending { + // NOTE(ridwanmsharif): the sending of the batches happen asyncronously + // and we only wait for a fixed amount of time after the final batch is sent + // before shutting down the exporter. + time.Sleep(flushTimeout) + drained <- struct{}{} + } + } + }() + for { + select { + case <-exitTimer.C: + //nolint:errcheck + level.Info(e.logger).Log("msg", "Exiting Exporter - Data wasn't sent within the timeout limit.") + samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.") + return + case <-drained: + return + } + } + } + for { select { // NOTE(freinartz): we will terminate once context is cancelled and not flush remaining @@ -772,6 +833,11 @@ func (e *Exporter) Run() error { // This is fine once we persist data submitted via Export() but for now there may be some // data loss on shutdown. case <-e.ctx.Done(): + // on termination, try to drain the remaining shards within the CancelTimeout. + // This is done to prevent data loss during a shutdown. + drainShardsBeforeExiting() + // This channel is used for unit test case. + e.exitc <- struct{}{} return nil // This is activated for each new sample that arrives case <-e.nextc: @@ -807,7 +873,8 @@ func (e *Exporter) close() { e.mtx.Lock() defer e.mtx.Unlock() if err := e.metricClient.Close(); err != nil { - _ = e.logger.Log("msg", "error closing metric client", "err", err) + //nolint:errcheck + e.logger.Log("msg", "error closing metric client", "err", err) } e.metricClient = nil } diff --git a/pkg/export/export_test.go b/pkg/export/export_test.go index abffdd5bf6..e813cc08b1 100644 --- a/pkg/export/export_test.go +++ b/pkg/export/export_test.go @@ -526,3 +526,57 @@ func TestDisabledExporter(t *testing.T) { // Allow samples to be sent to the void. If we don't panic, we're good. time.Sleep(batchDelayMax) } + +func TestExporter_shutdown(t *testing.T) { + eCtx, eCtxCancel := context.WithCancel(context.Background()) + + exporterOpts := ExporterOpts{DisableAuth: true} + exporterOpts.DefaultUnsetFields() + e, err := New(eCtx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, exporterOpts, NopLease()) + if err != nil { + t.Fatalf("Creating Exporter failed: %s", err) + } + metricServer := testMetricService{} + e.metricClient = &metricServer + + e.SetLabelsByIDFunc(func(storage.SeriesRef) labels.Labels { + return labels.FromStrings("project_id", "test", "location", "test") + }) + + //nolint:errcheck + go e.Run() + + // Fill a single shard with samples. + wantSamples := 50 + for i := range wantSamples { + e.Export(nil, []record.RefSample{ + {Ref: 1, T: int64(i), V: float64(i)}, + }, nil) + } + + // Shut down the exporter. + eCtxCancel() + + // Wait for exporter to finish flushing shards. + <-e.exitc + + // These samples will be rejected since the exporter has been cancelled. + for i := range 10 { + e.Export(nil, []record.RefSample{ + {Ref: 1, T: int64(i), V: float64(i)}, + }, nil) + } + + ctxTimeout, cancel := context.WithTimeout(context.Background(), 2*cancelTimeout) + defer cancel() + + pollErr := wait.PollUntilContextCancel(ctxTimeout, time.Second, false, func(_ context.Context) (bool, error) { + return len(metricServer.samples) == wantSamples, nil + }) + if pollErr != nil { + if wait.Interrupted(pollErr) && err != nil { + pollErr = err + } + t.Fatalf("did not get samples: %s", pollErr) + } +}