diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index 6d63d40..ac8d340 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -315,45 +315,45 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string) // Distribute files among workers fileIdx := id - backoff := backoff.NewExponentialBackOff() - backoff.InitialInterval = g.rate + + // Use exponential backoff with configured rate as initial interval + backoffConfig := backoff.NewExponentialBackOff() + backoffConfig.InitialInterval = g.rate + backoffConfig.MaxInterval = 5 * time.Second + backoffConfig.MaxElapsedTime = 0 // Never stop retrying + + backoffTicker := backoff.NewTicker(backoffConfig) + defer backoffTicker.Stop() for { select { case <-g.stopCh: g.logger.Debug("Worker received stop signal", zap.Int("id", id)) return - default: - } - - if fileIdx >= len(files) { - // Cycle back to the beginning - fileIdx = 0 - } + case <-backoffTicker.C: + if fileIdx >= len(files) { + // Cycle back to the beginning + fileIdx = 0 + } - file := files[fileIdx] - err := g.readAndWriteFile(file, writer) - if err != nil { - g.logger.Error("Error reading file", zap.String("file", file), zap.Error(err)) - g.writeErrors.Add(context.Background(), 1, - metric.WithAttributeSet( - attribute.NewSet( - attribute.String("component", "generator_file"), + file := files[fileIdx] + err := g.readAndWriteFile(file, writer) + if err != nil { + g.logger.Error("Error reading file", zap.String("file", file), zap.Error(err)) + g.writeErrors.Add(context.Background(), 1, + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "generator_file"), + ), ), - ), - ) - } - - fileIdx += g.workers + ) + // On error, backoff will automatically handle retry timing + continue + } - // Apply backoff for rate limiting - ticker := time.NewTicker(backoff.NextBackOff()) - select { - case <-g.stopCh: - ticker.Stop() - return - case <-ticker.C: - ticker.Stop() + // On success, reset backoff to configured rate + backoffConfig.Reset() + fileIdx += g.workers } } }