Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions generator/filegen/filegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,45 +315,45 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string)

// Distribute files among workers
fileIdx := id
backoff := backoff.NewExponentialBackOff()
backoff.InitialInterval = g.rate

// Use exponential backoff with configured rate as initial interval
backoffConfig := backoff.NewExponentialBackOff()
backoffConfig.InitialInterval = g.rate
backoffConfig.MaxInterval = 5 * time.Second
backoffConfig.MaxElapsedTime = 0 // Never stop retrying

backoffTicker := backoff.NewTicker(backoffConfig)
defer backoffTicker.Stop()

for {
select {
case <-g.stopCh:
g.logger.Debug("Worker received stop signal", zap.Int("id", id))
return
default:
}

if fileIdx >= len(files) {
// Cycle back to the beginning
fileIdx = 0
}
case <-backoffTicker.C:
if fileIdx >= len(files) {
// Cycle back to the beginning
fileIdx = 0
}

file := files[fileIdx]
err := g.readAndWriteFile(file, writer)
if err != nil {
g.logger.Error("Error reading file", zap.String("file", file), zap.Error(err))
g.writeErrors.Add(context.Background(), 1,
metric.WithAttributeSet(
attribute.NewSet(
attribute.String("component", "generator_file"),
file := files[fileIdx]
err := g.readAndWriteFile(file, writer)
if err != nil {
g.logger.Error("Error reading file", zap.String("file", file), zap.Error(err))
g.writeErrors.Add(context.Background(), 1,
metric.WithAttributeSet(
attribute.NewSet(
attribute.String("component", "generator_file"),
),
),
),
)
}

fileIdx += g.workers
)
// On error, backoff will automatically handle retry timing
continue
}

// Apply backoff for rate limiting
ticker := time.NewTicker(backoff.NextBackOff())
select {
case <-g.stopCh:
ticker.Stop()
return
case <-ticker.C:
ticker.Stop()
// On success, reset backoff to configured rate
backoffConfig.Reset()
fileIdx += g.workers
}
}
}
Expand Down