From 66ff8f531fc685c90beaec4f2019b6d703c45ca3 Mon Sep 17 00:00:00 2001 From: Max Amin Date: Tue, 19 Oct 2021 13:41:31 +0000 Subject: [PATCH 1/2] Flush queues on collector shutdown 1. When shutdown is initiated stop more messages from being sent 2. Drain the shards and send the batches in under 15 seconds --- pkg/export/export.go | 80 ++++++++++++++++++++++++++++++++++++--- pkg/export/export_test.go | 64 +++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 5 deletions(-) diff --git a/pkg/export/export.go b/pkg/export/export.go index b068044795..44f6e5426d 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -50,6 +50,10 @@ import ( ) var ( + gcmExportCalledWhileDisabled = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "gcm_export_called_while_disabled_total", + Help: "Number of calls to export while metric exporting was disabled.", + }) samplesExported = prometheus.NewCounter(prometheus.CounterOpts{ Name: "gcm_export_samples_exported_total", Help: "Number of samples exported at scrape time.", @@ -132,6 +136,9 @@ type Exporter struct { // be processed. nextc chan struct{} + // Channel for signaling to exit the exporter. Used to indicate + // data is done exporting during unit tests. + exitc chan struct{} // The external labels may be updated asynchronously by configuration changes // and must be locked with mtx. mtx sync.RWMutex @@ -162,7 +169,9 @@ const ( // Time after an accumulating batch is flushed to GCM. This avoids data being // held indefinititely if not enough new data flows in to fill up the batch. batchDelayMax = 50 * time.Millisecond - + // Time after context is cancelled that we use to flush the remaining buffered data. + // This avoids data loss on shutdown. + cancelTimeout = 15 * time.Second // Prefix for GCM metric. MetricTypePrefix = "prometheus.googleapis.com" ) @@ -396,6 +405,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts pendingRequests, projectsPerBatch, samplesPerRPCBatch, + gcmExportCalledWhileDisabled, ) } @@ -420,6 +430,7 @@ func New(ctx context.Context, logger log.Logger, reg prometheus.Registerer, opts externalLabels: createLabelSet(&config.Config{}, &opts), newMetricClient: defaultNewMetricClient, nextc: make(chan struct{}, 1), + exitc: make(chan struct{}, 1), shards: make([]*shard, opts.Efficiency.ShardCount), warnedUntypedMetrics: map[string]struct{}{}, lease: lease, @@ -546,17 +557,31 @@ func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) { e.seriesCache.getLabelsByRef = f } +// isDisabled checks if Exporter is disabled. +func (e *Exporter) isDisabled() bool { + e.mtx.Lock() + defer e.mtx.Unlock() + return e.opts.Disable +} + +// disable stops requests from being sent to Monarch. +func (e *Exporter) disable() { + e.mtx.Lock() + defer e.mtx.Unlock() + e.opts.Disable = true +} + // Export enqueues the samples and exemplars to be written to Cloud Monitoring. func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) { + if e.isDisabled() { + gcmExportCalledWhileDisabled.Inc() + return + } // Wether we're sending data or not, add batchsize of samples exported by // Prometheus from appender commit. batchSize := len(batch) samplesExported.Add(float64(batchSize)) - if e.opts.Disable { - return - } - metadata = e.wrapMetadata(metadata) e.mtx.Lock() @@ -765,6 +790,43 @@ func (e *Exporter) Run() error { curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) } + // Try to drain the remaining data before exiting or the timelimit (15 seconds) expires. + // A sleep timer is added after draining the shards to ensure it has time to be sent. + drainShardsBeforeExiting := func() { + level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.") + exitTimer := time.NewTimer(cancelTimeout) + drained := make(chan struct{}, 1) + go func() { + for { + totalRemaining := 0 + pending := false + for _, shard := range e.shards { + _, remaining := shard.fill(curBatch) + totalRemaining += remaining + pending = pending || shard.pending + if !curBatch.empty() { + send() + } + } + if totalRemaining == 0 && !pending { + // Wait for the final shards to send. + time.Sleep(100 * time.Millisecond) + drained <- struct{}{} + } + } + }() + for { + select { + case <-exitTimer.C: + level.Info(e.logger).Log("msg", "Exiting Exporter - Data wasn't sent within the timeout limit.") + samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.") + return + case <-drained: + return + } + } + } + for { select { // NOTE(freinartz): we will terminate once context is cancelled and not flush remaining @@ -772,6 +834,14 @@ func (e *Exporter) Run() error { // This is fine once we persist data submitted via Export() but for now there may be some // data loss on shutdown. case <-e.ctx.Done(): + // On Termination: + // 1. Stop the exporter from recieving new data. + // 2. Try to drain the remaining shards within the CancelTimeout. + // This is done to prevent data loss during a shutdown. + e.disable() + drainShardsBeforeExiting() + // This channel is used for unit test case. + e.exitc <- struct{}{} return nil // This is activated for each new sample that arrives case <-e.nextc: diff --git a/pkg/export/export_test.go b/pkg/export/export_test.go index abffdd5bf6..05743cb2f8 100644 --- a/pkg/export/export_test.go +++ b/pkg/export/export_test.go @@ -18,11 +18,13 @@ import ( "context" "errors" "fmt" + "net" "os" "sync" "testing" "time" + monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/google/go-cmp/cmp" @@ -32,6 +34,7 @@ import ( "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" + "google.golang.org/api/option" monitoredres_pb "google.golang.org/genproto/googleapis/api/monitoredres" timestamp_pb "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/util/wait" @@ -526,3 +529,64 @@ func TestDisabledExporter(t *testing.T) { // Allow samples to be sent to the void. If we don't panic, we're good. time.Sleep(batchDelayMax) } + +func TestExporter_shutdown(t *testing.T) { + var ( + srv = grpc.NewServer() + listener = bufconn.Listen(1e6) + metricServer = &testMetricService{} + ) + monitoring_pb.RegisterMetricServiceServer(srv, metricServer) + + go func() { srv.Serve(listener) }() + defer srv.Stop() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bufDialer := func(context.Context, string) (net.Conn, error) { + return listener.Dial() + } + metricClient, err := monitoring.NewMetricClient(ctx, + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithInsecure()), + option.WithGRPCDialOption(grpc.WithContextDialer(bufDialer)), + ) + if err != nil { + t.Fatalf("creating metric client failed: %s", err) + } + + e, err := New(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true}) + if err != nil { + t.Fatalf("Creating Exporter failed: %s", err) + } + e.metricClient = metricClient + + e.SetLabelsByIDFunc(func(i storage.SeriesRef) labels.Labels { + return labels.FromStrings("project_id", "test", "location", "test", fmt.Sprintf("label_%d", i), "test") + }) + + exportCtx, cancelExport := context.WithCancel(context.Background()) + + for i := 0; i < 50; i++ { + e.Export(nil, []record.RefSample{ + {Ref: 1, T: int64(i), V: float64(i)}, + }, nil) + } + go e.Run(exportCtx) + + cancelExport() + // Time delay is added to ensure exporter is disabled. + time.Sleep(50 * time.Millisecond) + + // These samples will be rejected since the exporter has been cancelled. + for i := 0; i < 10; i++ { + e.Export(nil, []record.RefSample{ + {Ref: 1, T: int64(i), V: float64(i)}, + }, nil) + } + // Wait for exporter to finish flushing shards. + <-e.exitc + // Check that we received all samples that went in. + if got, want := len(metricServer.samples), 50; got != want { + t.Fatalf("got %d, want %d", got, want) + } +} From d9695bf8b426e41038eed4cbdfa75cad17f75635 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Mon, 23 Sep 2024 17:58:48 +0000 Subject: [PATCH 2/2] feat(pkg/export): flush on export (rebase and fixup) Signed-off-by: Ridwan Sharif --- pkg/export/export.go | 43 +++++++++++------------ pkg/export/export_test.go | 72 +++++++++++++++++---------------------- 2 files changed, 51 insertions(+), 64 deletions(-) diff --git a/pkg/export/export.go b/pkg/export/export.go index 44f6e5426d..1b433af849 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -172,6 +172,8 @@ const ( // Time after context is cancelled that we use to flush the remaining buffered data. // This avoids data loss on shutdown. cancelTimeout = 15 * time.Second + // Time after the final shards are drained before the exporter is closed on shutdown. + flushTimeout = 100 * time.Millisecond // Prefix for GCM metric. MetricTypePrefix = "prometheus.googleapis.com" ) @@ -557,23 +559,9 @@ func (e *Exporter) SetLabelsByIDFunc(f func(storage.SeriesRef) labels.Labels) { e.seriesCache.getLabelsByRef = f } -// isDisabled checks if Exporter is disabled. -func (e *Exporter) isDisabled() bool { - e.mtx.Lock() - defer e.mtx.Unlock() - return e.opts.Disable -} - -// disable stops requests from being sent to Monarch. -func (e *Exporter) disable() { - e.mtx.Lock() - defer e.mtx.Unlock() - e.opts.Disable = true -} - // Export enqueues the samples and exemplars to be written to Cloud Monitoring. func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemplarMap map[storage.SeriesRef]record.RefExemplar) { - if e.isDisabled() { + if e.opts.Disable { gcmExportCalledWhileDisabled.Inc() return } @@ -767,6 +755,11 @@ func (e *Exporter) Run() error { send := func() { e.mtx.RLock() opts := e.opts + if e.metricClient == nil { + // Flush timeout reached, runner is shut down. + e.mtx.RUnlock() + return + } sendFunc := e.metricClient.CreateTimeSeries e.mtx.RUnlock() @@ -790,9 +783,10 @@ func (e *Exporter) Run() error { curBatch = newBatch(e.logger, opts.Efficiency.ShardCount, opts.Efficiency.BatchSize) } - // Try to drain the remaining data before exiting or the timelimit (15 seconds) expires. + // Try to drain the remaining data before exiting or the time limit (15 seconds) expires. // A sleep timer is added after draining the shards to ensure it has time to be sent. drainShardsBeforeExiting := func() { + //nolint:errcheck level.Info(e.logger).Log("msg", "Exiting Exporter - will attempt to send remaining data in the next 15 seconds.") exitTimer := time.NewTimer(cancelTimeout) drained := make(chan struct{}, 1) @@ -803,14 +797,18 @@ func (e *Exporter) Run() error { for _, shard := range e.shards { _, remaining := shard.fill(curBatch) totalRemaining += remaining + shard.mtx.Lock() pending = pending || shard.pending + shard.mtx.Unlock() if !curBatch.empty() { send() } } if totalRemaining == 0 && !pending { - // Wait for the final shards to send. - time.Sleep(100 * time.Millisecond) + // NOTE(ridwanmsharif): the sending of the batches happen asyncronously + // and we only wait for a fixed amount of time after the final batch is sent + // before shutting down the exporter. + time.Sleep(flushTimeout) drained <- struct{}{} } } @@ -818,6 +816,7 @@ func (e *Exporter) Run() error { for { select { case <-exitTimer.C: + //nolint:errcheck level.Info(e.logger).Log("msg", "Exiting Exporter - Data wasn't sent within the timeout limit.") samplesDropped.WithLabelValues("Data wasn't sent within the timeout limit.") return @@ -834,11 +833,8 @@ func (e *Exporter) Run() error { // This is fine once we persist data submitted via Export() but for now there may be some // data loss on shutdown. case <-e.ctx.Done(): - // On Termination: - // 1. Stop the exporter from recieving new data. - // 2. Try to drain the remaining shards within the CancelTimeout. + // on termination, try to drain the remaining shards within the CancelTimeout. // This is done to prevent data loss during a shutdown. - e.disable() drainShardsBeforeExiting() // This channel is used for unit test case. e.exitc <- struct{}{} @@ -877,7 +873,8 @@ func (e *Exporter) close() { e.mtx.Lock() defer e.mtx.Unlock() if err := e.metricClient.Close(); err != nil { - _ = e.logger.Log("msg", "error closing metric client", "err", err) + //nolint:errcheck + e.logger.Log("msg", "error closing metric client", "err", err) } e.metricClient = nil } diff --git a/pkg/export/export_test.go b/pkg/export/export_test.go index 05743cb2f8..e813cc08b1 100644 --- a/pkg/export/export_test.go +++ b/pkg/export/export_test.go @@ -18,13 +18,11 @@ import ( "context" "errors" "fmt" - "net" "os" "sync" "testing" "time" - monitoring "cloud.google.com/go/monitoring/apiv3/v2" monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/google/go-cmp/cmp" @@ -34,7 +32,6 @@ import ( "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/record" - "google.golang.org/api/option" monitoredres_pb "google.golang.org/genproto/googleapis/api/monitoredres" timestamp_pb "google.golang.org/protobuf/types/known/timestamppb" "k8s.io/apimachinery/pkg/util/wait" @@ -531,62 +528,55 @@ func TestDisabledExporter(t *testing.T) { } func TestExporter_shutdown(t *testing.T) { - var ( - srv = grpc.NewServer() - listener = bufconn.Listen(1e6) - metricServer = &testMetricService{} - ) - monitoring_pb.RegisterMetricServiceServer(srv, metricServer) - - go func() { srv.Serve(listener) }() - defer srv.Stop() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - bufDialer := func(context.Context, string) (net.Conn, error) { - return listener.Dial() - } - metricClient, err := monitoring.NewMetricClient(ctx, - option.WithoutAuthentication(), - option.WithGRPCDialOption(grpc.WithInsecure()), - option.WithGRPCDialOption(grpc.WithContextDialer(bufDialer)), - ) - if err != nil { - t.Fatalf("creating metric client failed: %s", err) - } + eCtx, eCtxCancel := context.WithCancel(context.Background()) - e, err := New(log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)), nil, ExporterOpts{DisableAuth: true}) + exporterOpts := ExporterOpts{DisableAuth: true} + exporterOpts.DefaultUnsetFields() + e, err := New(eCtx, log.NewJSONLogger(log.NewSyncWriter(os.Stderr)), nil, exporterOpts, NopLease()) if err != nil { t.Fatalf("Creating Exporter failed: %s", err) } - e.metricClient = metricClient + metricServer := testMetricService{} + e.metricClient = &metricServer - e.SetLabelsByIDFunc(func(i storage.SeriesRef) labels.Labels { - return labels.FromStrings("project_id", "test", "location", "test", fmt.Sprintf("label_%d", i), "test") + e.SetLabelsByIDFunc(func(storage.SeriesRef) labels.Labels { + return labels.FromStrings("project_id", "test", "location", "test") }) - exportCtx, cancelExport := context.WithCancel(context.Background()) + //nolint:errcheck + go e.Run() - for i := 0; i < 50; i++ { + // Fill a single shard with samples. + wantSamples := 50 + for i := range wantSamples { e.Export(nil, []record.RefSample{ {Ref: 1, T: int64(i), V: float64(i)}, }, nil) } - go e.Run(exportCtx) - cancelExport() - // Time delay is added to ensure exporter is disabled. - time.Sleep(50 * time.Millisecond) + // Shut down the exporter. + eCtxCancel() + + // Wait for exporter to finish flushing shards. + <-e.exitc // These samples will be rejected since the exporter has been cancelled. - for i := 0; i < 10; i++ { + for i := range 10 { e.Export(nil, []record.RefSample{ {Ref: 1, T: int64(i), V: float64(i)}, }, nil) } - // Wait for exporter to finish flushing shards. - <-e.exitc - // Check that we received all samples that went in. - if got, want := len(metricServer.samples), 50; got != want { - t.Fatalf("got %d, want %d", got, want) + + ctxTimeout, cancel := context.WithTimeout(context.Background(), 2*cancelTimeout) + defer cancel() + + pollErr := wait.PollUntilContextCancel(ctxTimeout, time.Second, false, func(_ context.Context) (bool, error) { + return len(metricServer.samples) == wantSamples, nil + }) + if pollErr != nil { + if wait.Interrupted(pollErr) && err != nil { + pollErr = err + } + t.Fatalf("did not get samples: %s", pollErr) } }