Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/blitz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,8 @@ func run(cmd *cobra.Command, args []string) error {
cfg.Generator.Filegen.Workers,
cfg.Generator.Filegen.Rate,
cfg.Generator.Filegen.Source,
cfg.Generator.Filegen.CacheEnabled,
cfg.Generator.Filegen.CacheTTL,
)
if err != nil {
logger.Error("Failed to create File generator", zap.Error(err))
Expand Down
65 changes: 58 additions & 7 deletions docs/generator/filegen.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# File Generator (filegen)

The File generator reads log entries from files on disk and automatically processes timestamp directives. It supports reading from a single file, reading from all files in a directory, or reading from a pre-distributed package of sample logs. Timestamp directives in log entries (like `%c`, `%Y-%m-%dT%H:%M:%SZ`, etc.) are replaced with actual formatted timestamps on each log generation.
The File generator reads log entries from files on disk and selects a random line from each file on each run. It automatically processes timestamp directives on the selected line. It supports reading from a single file, reading from all files in a directory, or reading from a pre-distributed package of sample logs. Timestamp directives in log entries (like `%c`, `%Y-%m-%dT%H:%M:%SZ`, etc.) are replaced with actual formatted timestamps when the line is selected.

## Features

Expand Down Expand Up @@ -59,6 +59,8 @@ The File generator automatically processes timestamp directives in log files, re
| `generator.filegen.workers` | `--generator-filegen-workers` | `BLITZ_GENERATOR_FILEGEN_WORKERS` | `1` | Number of worker goroutines (must be ≥ 1) |
| `generator.filegen.rate` | `--generator-filegen-rate` | `BLITZ_GENERATOR_FILEGEN_RATE` | `1s` | Rate at which logs are written per worker (duration format) |
| `generator.filegen.source` | `--generator-filegen-source` | `BLITZ_GENERATOR_FILEGEN_SOURCE` | `` | File path, directory path, or glob pattern (auto-detected) |
| `generator.filegen.cache-enabled` | `--generator-filegen-cache-enabled` | `BLITZ_GENERATOR_FILEGEN_CACHE_ENABLED` | `true` | Enable in-memory file caching (true/false) |
| `generator.filegen.cache-ttl` | `--generator-filegen-cache-ttl` | `BLITZ_GENERATOR_FILEGEN_CACHE_TTL` | `0` | Cache entry time-to-live in duration format (0 = never expire) |

## Example Configurations

Expand Down Expand Up @@ -136,15 +138,24 @@ blitz --generator-type=filegen --generator-filegen-source='data_library/*'

### Worker Distribution

Workers read files sequentially and cycle back to the beginning when all files are exhausted. Each worker processes files in order, with rate limiting applied between each log line write.
Workers are assigned files in round-robin fashion. On each rate cycle, each worker:
1. Reads all lines from its assigned file
2. Selects one line at random from the file (skipping empty lines)
3. Writes the selected line to the output
4. Waits for the configured rate period before processing the next file

Workers cycle back to the beginning of the file list when all files are exhausted.

### Log Line Processing

Each line in a file is treated atomically:
1. The line is read from the file
2. Empty lines are skipped
3. The line is written to the output with the configured rate limit applied
4. If a write fails, an error is recorded and the next line is processed
On each work cycle:
1. A file is selected from the worker's assigned files
2. The entire file is read into memory
3. All non-empty lines are collected
4. A random line is selected from the collected lines
5. Timestamp directives in the selected line are processed
6. The line is written to the output with the configured rate limit applied
7. If a write fails, an error is recorded and the next file cycle begins

### Rate Limiting

Expand Down Expand Up @@ -178,6 +189,46 @@ Thu Jan 13 15:30:45 2026 # %c format
2026-01-13 15:30:45 # %Y-%m-%d %H:%M:%S format
Jan 13 15:30:45 # %b %d %H:%M:%S format
```
## Caching

The File generator implements an in-memory cache to avoid reading files from disk on every log generation cycle. This significantly improves performance when working with large numbers of files or files with many lines.

**Cache Behavior:**
- Caching is **enabled by default** (`cache_enabled: true`)
- Each file's lines are cached in memory after the first read
- Cache entries can have an optional time-to-live (TTL) for automatic invalidation
- **Default TTL is 0** (cache entries never expire)
- Setting `cache_ttl` to a value (e.g., `1m`) will invalidate entries older than that duration
- Cache is **thread-safe** and allows concurrent access from multiple worker goroutines
- Cache uses LRU (Least Recently Used) eviction with a 1000-file limit
- Each file maintains its own cache entry independently

**Disabling Cache:**
- Set `cache_enabled: false` to disable caching entirely
- Useful when dealing with very large files or when file contents change frequently

**Example with Cache TTL (1 minute):**
```yaml
generator:
type: filegen
filegen:
workers: 4
rate: 100ms
source: /var/log/app.log
cache_enabled: true
cache_ttl: 1m
```

**Example with Caching Disabled:**
```yaml
generator:
type: filegen
filegen:
workers: 2
rate: 500ms
source: /large/files/directory
cache_enabled: false
```

## Metrics

Expand Down
172 changes: 128 additions & 44 deletions generator/filegen/filegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"context"
"fmt"
"io/fs"
"math/rand"
"os"
"path/filepath"
"regexp"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/observiq/blitz/internal/generator/ctime"
"github.com/observiq/blitz/output"
"go.opentelemetry.io/otel"
Expand All @@ -20,6 +22,52 @@ import (
"go.uber.org/zap"
)

// Cache provides thread-safe access to file line caches with optional TTL
type Cache struct {
lruCache *expirable.LRU[string, []string]
enabled bool
}

// NewCache creates a new Cache with the given size limit and TTL
// ttl of 0 means entries never expire
func NewCache(enabled bool, ttl time.Duration, maxSize int) (*Cache, error) {
if !enabled {
return &Cache{enabled: false}, nil
}

// For 0 TTL (never expire), use a very large duration
// expirable.LRU requires a TTL, so we use a large value for "never expire"
effectiveTTL := ttl
if ttl == 0 {
effectiveTTL = 24 * 365 * time.Hour // ~1 year (effectively never)
}

lruCache := expirable.NewLRU[string, []string](maxSize, nil, effectiveTTL)

return &Cache{
lruCache: lruCache,
enabled: true,
}, nil
}

// Get retrieves a cache entry if it exists and hasn't expired
func (c *Cache) Get(key string) ([]string, bool) {
if !c.enabled {
return nil, false
}

return c.lruCache.Get(key)
}

// Set stores a cache entry
func (c *Cache) Set(key string, lines []string) {
if !c.enabled {
return
}

c.lruCache.Add(key, lines)
}

// FileLogGenerator generates log data by reading from files
type FileLogGenerator struct {
logger *zap.Logger
Expand All @@ -34,10 +82,13 @@ type FileLogGenerator struct {
logsGenerated metric.Int64Counter
activeWorkers metric.Int64Gauge
writeErrors metric.Int64Counter

// File cache
cache *Cache
}

// New creates a new File log generator
func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*FileLogGenerator, error) {
func New(logger *zap.Logger, workers int, rate time.Duration, source string, cacheEnabled bool, cacheTTL time.Duration) (*FileLogGenerator, error) {
if logger == nil {
return nil, fmt.Errorf("logger cannot be nil")
}
Expand Down Expand Up @@ -80,6 +131,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F
return nil, fmt.Errorf("create write errors counter: %w", err)
}

cache, err := NewCache(cacheEnabled, cacheTTL, 1000) // 1000 is max size for LRU
if err != nil {
return nil, fmt.Errorf("create cache: %w", err)
}

return &FileLogGenerator{
logger: logger,
workers: workers,
Expand All @@ -90,6 +146,7 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F
logsGenerated: logsGenerated,
activeWorkers: activeWorkers,
writeErrors: writeErrors,
cache: cache,
}, nil
}

Expand Down Expand Up @@ -301,66 +358,93 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string)
}
}

// readAndWriteFile reads a file line by line and writes each line to the writer
// readAndWriteFile reads a file, selects a random non-empty line, and writes it to the writer
func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Writer) error {
// #nosec G304 - filename is controlled by the application, either from explicit config or from walking data library directory
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("open file: %w", err)
}
defer file.Close()

scanner := bufio.NewScanner(file)
for scanner.Scan() {
select {
case <-g.stopCh:
return nil
default:
}

line := scanner.Bytes()
if len(line) == 0 {
continue
// Check cache first
var lines []string
if cachedLines, found := g.cache.Get(filename); found {
lines = cachedLines
} else {
// Cache miss, read from disk
var err error
lines, err = g.readFileLines(filename)
if err != nil {
return err
}

// Process timestamp directives in the line
processedLine := g.processTimestamps(string(line))
// Update cache
g.cache.Set(filename, lines)
}

// Write with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := writer.Write(ctx, output.LogRecord{
Message: processedLine,
Metadata: output.LogRecordMetadata{
Timestamp: time.Now(),
},
})
cancel()
// If no non-empty lines found, return without error
if len(lines) == 0 {
return nil
}

if err != nil {
g.writeErrors.Add(context.Background(), 1,
metric.WithAttributeSet(
attribute.NewSet(
attribute.String("component", "generator_file"),
),
),
)
return fmt.Errorf("write: %w", err)
}
// Select a random line
// #nosec G404 - using weak random is acceptable for log generation, not cryptographic purposes
randomIdx := rand.Intn(len(lines))
selectedLine := lines[randomIdx]

// Process timestamp directives in the line
processedLine := g.processTimestamps(selectedLine)

// Write with timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := writer.Write(ctx, output.LogRecord{
Message: processedLine,
Metadata: output.LogRecordMetadata{
Timestamp: time.Now(),
},
})
cancel()

g.logsGenerated.Add(context.Background(), 1,
if err != nil {
g.writeErrors.Add(context.Background(), 1,
metric.WithAttributeSet(
attribute.NewSet(
attribute.String("component", "generator_file"),
),
),
)
return fmt.Errorf("write: %w", err)
}

g.logsGenerated.Add(context.Background(), 1,
metric.WithAttributeSet(
attribute.NewSet(
attribute.String("component", "generator_file"),
),
),
)

return nil
}

// readFileLines reads all non-empty lines from a file
func (g *FileLogGenerator) readFileLines(filename string) ([]string, error) {
// #nosec G304 - filename is controlled by the application, either from explicit config or from walking data library directory
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("open file: %w", err)
}
defer file.Close()

// Read all non-empty lines from the file
var lines []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) > 0 {
lines = append(lines, string(line))
}
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("scanner error: %w", err)
return nil, fmt.Errorf("scanner error: %w", err)
}

return nil
return lines, nil
}

// processTimestamps replaces timestamp directives in the line with actual formatted timestamps
Expand Down
Loading