From d36c98ecaf6c0ac3097e50e0d11cce3b679add5c Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 15:39:14 -0500 Subject: [PATCH 1/7] fix(filegen): select random line on each run --- docs/generator/filegen.md | 23 +++++++---- generator/filegen/filegen.go | 74 +++++++++++++++++++----------------- 2 files changed, 56 insertions(+), 41 deletions(-) diff --git a/docs/generator/filegen.md b/docs/generator/filegen.md index 53c856e..f3ab7e4 100644 --- a/docs/generator/filegen.md +++ b/docs/generator/filegen.md @@ -1,6 +1,6 @@ # File Generator (filegen) -The File generator reads log entries from files on disk and automatically processes timestamp directives. It supports reading from a single file, reading from all files in a directory, or reading from a pre-distributed package of sample logs. Timestamp directives in log entries (like `%c`, `%Y-%m-%dT%H:%M:%SZ`, etc.) are replaced with actual formatted timestamps on each log generation. +The File generator reads log entries from files on disk and selects a random line from each file on each run. It automatically processes timestamp directives on the selected line. It supports reading from a single file, reading from all files in a directory, or reading from a pre-distributed package of sample logs. Timestamp directives in log entries (like `%c`, `%Y-%m-%dT%H:%M:%SZ`, etc.) are replaced with actual formatted timestamps when the line is selected. ## Features @@ -136,15 +136,24 @@ blitz --generator-type=filegen --generator-filegen-source='data_library/*' ### Worker Distribution -Workers read files sequentially and cycle back to the beginning when all files are exhausted. Each worker processes files in order, with rate limiting applied between each log line write. +Workers are assigned files in round-robin fashion. On each rate cycle, each worker: +1. Reads all lines from its assigned file +2. Selects one line at random from the file (skipping empty lines) +3. Writes the selected line to the output +4. Waits for the configured rate period before processing the next file + +Workers cycle back to the beginning of the file list when all files are exhausted. ### Log Line Processing -Each line in a file is treated atomically: -1. The line is read from the file -2. Empty lines are skipped -3. The line is written to the output with the configured rate limit applied -4. If a write fails, an error is recorded and the next line is processed +On each work cycle: +1. A file is selected from the worker's assigned files +2. The entire file is read into memory +3. All non-empty lines are collected +4. A random line is selected from the collected lines +5. Timestamp directives in the selected line are processed +6. The line is written to the output with the configured rate limit applied +7. If a write fails, an error is recorded and the next file cycle begins ### Rate Limiting diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index adb8983..b8cb866 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io/fs" + "math/rand" "os" "path/filepath" "regexp" @@ -301,7 +302,7 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string) } } -// readAndWriteFile reads a file line by line and writes each line to the writer +// readAndWriteFile reads a file, selects a random non-empty line, and writes it to the writer func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Writer) error { // #nosec G304 - filename is controlled by the application, either from explicit config or from walking data library directory file, err := os.Open(filename) @@ -310,55 +311,60 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write } defer file.Close() + // Read all non-empty lines from the file + var lines []string scanner := bufio.NewScanner(file) for scanner.Scan() { - select { - case <-g.stopCh: - return nil - default: - } - line := scanner.Bytes() - if len(line) == 0 { - continue + if len(line) > 0 { + lines = append(lines, string(line)) } + } - // Process timestamp directives in the line - processedLine := g.processTimestamps(string(line)) + if err := scanner.Err(); err != nil { + return fmt.Errorf("scanner error: %w", err) + } - // Write with timeout - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - err := writer.Write(ctx, output.LogRecord{ - Message: processedLine, - Metadata: output.LogRecordMetadata{ - Timestamp: time.Now(), - }, - }) - cancel() + // If no non-empty lines found, return without error + if len(lines) == 0 { + return nil + } - if err != nil { - g.writeErrors.Add(context.Background(), 1, - metric.WithAttributeSet( - attribute.NewSet( - attribute.String("component", "generator_file"), - ), - ), - ) - return fmt.Errorf("write: %w", err) - } + // Select a random line + randomIdx := rand.Intn(len(lines)) + selectedLine := lines[randomIdx] + + // Process timestamp directives in the line + processedLine := g.processTimestamps(selectedLine) - g.logsGenerated.Add(context.Background(), 1, + // Write with timeout + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + err = writer.Write(ctx, output.LogRecord{ + Message: processedLine, + Metadata: output.LogRecordMetadata{ + Timestamp: time.Now(), + }, + }) + cancel() + + if err != nil { + g.writeErrors.Add(context.Background(), 1, metric.WithAttributeSet( attribute.NewSet( attribute.String("component", "generator_file"), ), ), ) + return fmt.Errorf("write: %w", err) } - if err := scanner.Err(); err != nil { - return fmt.Errorf("scanner error: %w", err) - } + g.logsGenerated.Add(context.Background(), 1, + metric.WithAttributeSet( + attribute.NewSet( + attribute.String("component", "generator_file"), + ), + ), + ) return nil } From c2bbf653fcacd293d6efb5d00c4bfe42be52dc8f Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 15:40:44 -0500 Subject: [PATCH 2/7] perf(filegen): add file caching with 1-minute TTL --- docs/generator/filegen.md | 12 ++++++ generator/filegen/filegen.go | 78 ++++++++++++++++++++++++++++-------- 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/docs/generator/filegen.md b/docs/generator/filegen.md index f3ab7e4..535890f 100644 --- a/docs/generator/filegen.md +++ b/docs/generator/filegen.md @@ -187,6 +187,18 @@ Thu Jan 13 15:30:45 2026 # %c format 2026-01-13 15:30:45 # %Y-%m-%d %H:%M:%S format Jan 13 15:30:45 # %b %d %H:%M:%S format ``` +## Caching + +The File generator caches file contents in memory to avoid reading files from disk on every log generation cycle. This significantly improves performance, especially when working with large numbers of files or files with many lines. + +**Cache Behavior:** +- Files are cached in memory when first read +- Each cache entry is refreshed automatically after 1 minute of inactivity +- If a file's cache has expired, it is automatically re-read on the next access +- Cache is thread-safe and allows concurrent access from multiple worker goroutines +- Each file maintains its own cache entry independently + +This caching strategy balances performance optimization with the ability to pick up file changes within a reasonable time window. ## Metrics diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index b8cb866..cb29182 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -21,6 +21,12 @@ import ( "go.uber.org/zap" ) +// cacheEntry holds the cached lines of a file and when it was cached +type cacheEntry struct { + lines []string + cachedAt time.Time +} + // FileLogGenerator generates log data by reading from files type FileLogGenerator struct { logger *zap.Logger @@ -35,6 +41,11 @@ type FileLogGenerator struct { logsGenerated metric.Int64Counter activeWorkers metric.Int64Gauge writeErrors metric.Int64Counter + + // File cache (refreshed every minute) + cacheMu sync.RWMutex + fileCache map[string]*cacheEntry + cacheAge time.Duration // time after which cache expires (1 minute) } // New creates a new File log generator @@ -91,6 +102,8 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F logsGenerated: logsGenerated, activeWorkers: activeWorkers, writeErrors: writeErrors, + fileCache: make(map[string]*cacheEntry), + cacheAge: 1 * time.Minute, }, nil } @@ -304,25 +317,30 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string) // readAndWriteFile reads a file, selects a random non-empty line, and writes it to the writer func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Writer) error { - // #nosec G304 - filename is controlled by the application, either from explicit config or from walking data library directory - file, err := os.Open(filename) - if err != nil { - return fmt.Errorf("open file: %w", err) - } - defer file.Close() - - // Read all non-empty lines from the file + // Check cache first var lines []string - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Bytes() - if len(line) > 0 { - lines = append(lines, string(line)) + g.cacheMu.RLock() + cacheEntry, found := g.fileCache[filename] + g.cacheMu.RUnlock() + + if found && time.Since(cacheEntry.cachedAt) < g.cacheAge { + // Cache hit and still fresh + lines = cacheEntry.lines + } else { + // Cache miss or stale, read from disk + var err error + lines, err = g.readFileLines(filename) + if err != nil { + return err } - } - if err := scanner.Err(); err != nil { - return fmt.Errorf("scanner error: %w", err) + // Update cache + g.cacheMu.Lock() + g.fileCache[filename] = &cacheEntry{ + lines: lines, + cachedAt: time.Now(), + } + g.cacheMu.Unlock() } // If no non-empty lines found, return without error @@ -339,7 +357,7 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write // Write with timeout ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - err = writer.Write(ctx, output.LogRecord{ + err := writer.Write(ctx, output.LogRecord{ Message: processedLine, Metadata: output.LogRecordMetadata{ Timestamp: time.Now(), @@ -369,6 +387,32 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write return nil } +// readFileLines reads all non-empty lines from a file +func (g *FileLogGenerator) readFileLines(filename string) ([]string, error) { + // #nosec G304 - filename is controlled by the application, either from explicit config or from walking data library directory + file, err := os.Open(filename) + if err != nil { + return nil, fmt.Errorf("open file: %w", err) + } + defer file.Close() + + // Read all non-empty lines from the file + var lines []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Bytes() + if len(line) > 0 { + lines = append(lines, string(line)) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scanner error: %w", err) + } + + return lines, nil +} + // processTimestamps replaces timestamp directives in the line with actual formatted timestamps func (g *FileLogGenerator) processTimestamps(line string) string { now := time.Now() From 7a2609f709b76d8d06bc8251f932d3a67ab06cc6 Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 15:45:17 -0500 Subject: [PATCH 3/7] fix(filegen): export CacheEntry type and regenerate completions --- generator/filegen/filegen.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index cb29182..6b51101 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -21,8 +21,8 @@ import ( "go.uber.org/zap" ) -// cacheEntry holds the cached lines of a file and when it was cached -type cacheEntry struct { +// CacheEntry holds the cached lines of a file and when it was cached +type CacheEntry struct { lines []string cachedAt time.Time } @@ -44,7 +44,7 @@ type FileLogGenerator struct { // File cache (refreshed every minute) cacheMu sync.RWMutex - fileCache map[string]*cacheEntry + fileCache map[string]*CacheEntry cacheAge time.Duration // time after which cache expires (1 minute) } @@ -102,7 +102,7 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F logsGenerated: logsGenerated, activeWorkers: activeWorkers, writeErrors: writeErrors, - fileCache: make(map[string]*cacheEntry), + fileCache: make(map[string]*CacheEntry), cacheAge: 1 * time.Minute, }, nil } @@ -336,7 +336,7 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write // Update cache g.cacheMu.Lock() - g.fileCache[filename] = &cacheEntry{ + g.fileCache[filename] = &CacheEntry{ lines: lines, cachedAt: time.Now(), } From 43a68875b03b7ac178fe365d30df6fa2d1d043d2 Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 15:57:29 -0500 Subject: [PATCH 4/7] fix(filegen): add nosec comment for weak random usage --- generator/filegen/filegen.go | 1 + 1 file changed, 1 insertion(+) diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index 6b51101..f63824b 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -349,6 +349,7 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write } // Select a random line + // #nosec G404 - using weak random is acceptable for log generation, not cryptographic purposes randomIdx := rand.Intn(len(lines)) selectedLine := lines[randomIdx] From 74bc77c786ce96d66cd8b43e79ffe864c4dbcdfa Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 16:25:07 -0500 Subject: [PATCH 5/7] refactor(filegen): use golang-lru for configurable cache with TTL --- cmd/blitz/main.go | 2 + docs/generator/filegen.md | 42 +++++++++-- generator/filegen/filegen.go | 88 ++++++++++++++++++----- generator/filegen/filegen_test.go | 113 +++++++++++++++++++++++++++--- go.mod | 1 + go.sum | 2 + internal/config/generator_file.go | 4 ++ internal/config/override.go | 2 + package/completions/blitz.bash | 9 +++ 9 files changed, 229 insertions(+), 34 deletions(-) diff --git a/cmd/blitz/main.go b/cmd/blitz/main.go index ea116fa..bf70e0d 100644 --- a/cmd/blitz/main.go +++ b/cmd/blitz/main.go @@ -381,6 +381,8 @@ func run(cmd *cobra.Command, args []string) error { cfg.Generator.Filegen.Workers, cfg.Generator.Filegen.Rate, cfg.Generator.Filegen.Source, + cfg.Generator.Filegen.CacheEnabled, + cfg.Generator.Filegen.CacheTTL, ) if err != nil { logger.Error("Failed to create File generator", zap.Error(err)) diff --git a/docs/generator/filegen.md b/docs/generator/filegen.md index 535890f..8195f0f 100644 --- a/docs/generator/filegen.md +++ b/docs/generator/filegen.md @@ -59,6 +59,8 @@ The File generator automatically processes timestamp directives in log files, re | `generator.filegen.workers` | `--generator-filegen-workers` | `BLITZ_GENERATOR_FILEGEN_WORKERS` | `1` | Number of worker goroutines (must be ≥ 1) | | `generator.filegen.rate` | `--generator-filegen-rate` | `BLITZ_GENERATOR_FILEGEN_RATE` | `1s` | Rate at which logs are written per worker (duration format) | | `generator.filegen.source` | `--generator-filegen-source` | `BLITZ_GENERATOR_FILEGEN_SOURCE` | `` | File path, directory path, or glob pattern (auto-detected) | +| `generator.filegen.cache-enabled` | `--generator-filegen-cache-enabled` | `BLITZ_GENERATOR_FILEGEN_CACHE_ENABLED` | `true` | Enable in-memory file caching (true/false) | +| `generator.filegen.cache-ttl` | `--generator-filegen-cache-ttl` | `BLITZ_GENERATOR_FILEGEN_CACHE_TTL` | `0` | Cache entry time-to-live in duration format (0 = never expire) | ## Example Configurations @@ -189,16 +191,44 @@ Jan 13 15:30:45 # %b %d %H:%M:%S format ``` ## Caching -The File generator caches file contents in memory to avoid reading files from disk on every log generation cycle. This significantly improves performance, especially when working with large numbers of files or files with many lines. +The File generator implements an in-memory cache to avoid reading files from disk on every log generation cycle. This significantly improves performance when working with large numbers of files or files with many lines. **Cache Behavior:** -- Files are cached in memory when first read -- Each cache entry is refreshed automatically after 1 minute of inactivity -- If a file's cache has expired, it is automatically re-read on the next access -- Cache is thread-safe and allows concurrent access from multiple worker goroutines +- Caching is **enabled by default** (`cache_enabled: true`) +- Each file's lines are cached in memory after the first read +- Cache entries can have an optional time-to-live (TTL) for automatic invalidation + - **Default TTL is 0** (cache entries never expire) + - Setting `cache_ttl` to a value (e.g., `1m`) will invalidate entries older than that duration +- Cache is **thread-safe** and allows concurrent access from multiple worker goroutines +- Cache uses LRU (Least Recently Used) eviction with a 1000-file limit - Each file maintains its own cache entry independently -This caching strategy balances performance optimization with the ability to pick up file changes within a reasonable time window. +**Disabling Cache:** +- Set `cache_enabled: false` to disable caching entirely +- Useful when dealing with very large files or when file contents change frequently + +**Example with Cache TTL (1 minute):** +```yaml +generator: + type: filegen + filegen: + workers: 4 + rate: 100ms + source: /var/log/app.log + cache_enabled: true + cache_ttl: 1m +``` + +**Example with Caching Disabled:** +```yaml +generator: + type: filegen + filegen: + workers: 2 + rate: 500ms + source: /large/files/directory + cache_enabled: false +``` ## Metrics diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index f63824b..201de78 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" + lru "github.com/hashicorp/golang-lru/v2" "github.com/observiq/blitz/internal/generator/ctime" "github.com/observiq/blitz/output" "go.opentelemetry.io/otel" @@ -27,6 +28,61 @@ type CacheEntry struct { cachedAt time.Time } +// Cache provides thread-safe access to file line caches with optional TTL +type Cache struct { + lruCache *lru.Cache[string, *CacheEntry] + ttl time.Duration // 0 means never expire + enabled bool +} + +// NewCache creates a new Cache with the given size limit and TTL +func NewCache(enabled bool, ttl time.Duration, maxSize int) (*Cache, error) { + if !enabled { + return &Cache{enabled: false}, nil + } + + lruCache, err := lru.New[string, *CacheEntry](maxSize) + if err != nil { + return nil, fmt.Errorf("create LRU cache: %w", err) + } + + return &Cache{ + lruCache: lruCache, + ttl: ttl, + enabled: true, + }, nil +} + +// Get retrieves a cache entry if it exists and hasn't expired +func (c *Cache) Get(key string) (*CacheEntry, bool) { + if !c.enabled { + return nil, false + } + + entry, found := c.lruCache.Get(key) + if !found { + return nil, false + } + + // Check TTL (0 means never expire) + if c.ttl > 0 && time.Since(entry.cachedAt) > c.ttl { + // Entry has expired, remove it + c.lruCache.Remove(key) + return nil, false + } + + return entry, true +} + +// Set stores a cache entry +func (c *Cache) Set(key string, entry *CacheEntry) { + if !c.enabled { + return + } + + c.lruCache.Add(key, entry) +} + // FileLogGenerator generates log data by reading from files type FileLogGenerator struct { logger *zap.Logger @@ -42,14 +98,12 @@ type FileLogGenerator struct { activeWorkers metric.Int64Gauge writeErrors metric.Int64Counter - // File cache (refreshed every minute) - cacheMu sync.RWMutex - fileCache map[string]*CacheEntry - cacheAge time.Duration // time after which cache expires (1 minute) + // File cache + cache *Cache } // New creates a new File log generator -func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*FileLogGenerator, error) { +func New(logger *zap.Logger, workers int, rate time.Duration, source string, cacheEnabled bool, cacheTTL time.Duration) (*FileLogGenerator, error) { if logger == nil { return nil, fmt.Errorf("logger cannot be nil") } @@ -92,6 +146,11 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F return nil, fmt.Errorf("create write errors counter: %w", err) } + cache, err := NewCache(cacheEnabled, cacheTTL, 1000) // 1000 is max size for LRU + if err != nil { + return nil, fmt.Errorf("create cache: %w", err) + } + return &FileLogGenerator{ logger: logger, workers: workers, @@ -102,8 +161,7 @@ func New(logger *zap.Logger, workers int, rate time.Duration, source string) (*F logsGenerated: logsGenerated, activeWorkers: activeWorkers, writeErrors: writeErrors, - fileCache: make(map[string]*CacheEntry), - cacheAge: 1 * time.Minute, + cache: cache, }, nil } @@ -319,15 +377,11 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string) func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Writer) error { // Check cache first var lines []string - g.cacheMu.RLock() - cacheEntry, found := g.fileCache[filename] - g.cacheMu.RUnlock() - - if found && time.Since(cacheEntry.cachedAt) < g.cacheAge { - // Cache hit and still fresh + cacheEntry, found := g.cache.Get(filename) + if found { lines = cacheEntry.lines } else { - // Cache miss or stale, read from disk + // Cache miss, read from disk var err error lines, err = g.readFileLines(filename) if err != nil { @@ -335,12 +389,10 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write } // Update cache - g.cacheMu.Lock() - g.fileCache[filename] = &CacheEntry{ + g.cache.Set(filename, &CacheEntry{ lines: lines, cachedAt: time.Now(), - } - g.cacheMu.Unlock() + }) } // If no non-empty lines found, return without error diff --git a/generator/filegen/filegen_test.go b/generator/filegen/filegen_test.go index 3972f1c..24a5c2d 100644 --- a/generator/filegen/filegen_test.go +++ b/generator/filegen/filegen_test.go @@ -104,7 +104,7 @@ func TestNewFileLogGenerator(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gen, err := New(logger, tt.workers, tt.rate, tt.source) + gen, err := New(logger, tt.workers, tt.rate, tt.source, true, 0) if tt.wantErr { assert.Error(t, err) assert.Nil(t, gen) @@ -136,7 +136,7 @@ func TestFileLogGeneratorFileMode(t *testing.T) { } tmpfile.Close() - gen, err := New(logger, 1, 10*time.Millisecond, tmpfile.Name()) + gen, err := New(logger, 1, 10*time.Millisecond, tmpfile.Name(), true, 0) require.NoError(t, err) writer := newMockWriter() @@ -181,7 +181,7 @@ func TestFileLogGeneratorDirectoryMode(t *testing.T) { } // Test with directory mode (auto-detected) - gen, err := New(logger, 1, 10*time.Millisecond, tmpdir) + gen, err := New(logger, 1, 10*time.Millisecond, tmpdir, true, 0) require.NoError(t, err) writer := newMockWriter() @@ -204,7 +204,7 @@ func TestFileLogGeneratorDirectoryMode(t *testing.T) { func TestFileLogGeneratorNonexistentFile(t *testing.T) { logger := zaptest.NewLogger(t) - gen, err := New(logger, 1, 100*time.Millisecond, "/nonexistent/path/file.log") + gen, err := New(logger, 1, 100*time.Millisecond, "/nonexistent/path/file.log", true, 0) require.NoError(t, err) writer := newMockWriter() @@ -225,7 +225,7 @@ func TestFileLogGeneratorStop(t *testing.T) { require.NoError(t, err) tmpfile.Close() - gen, err := New(logger, 1, 100*time.Millisecond, tmpfile.Name()) + gen, err := New(logger, 1, 100*time.Millisecond, tmpfile.Name(), true, 0) require.NoError(t, err) writer := newMockWriter() @@ -251,7 +251,7 @@ func TestFileLogGeneratorWriteError(t *testing.T) { require.NoError(t, err) tmpfile.Close() - gen, err := New(logger, 1, 10*time.Millisecond, tmpfile.Name()) + gen, err := New(logger, 1, 10*time.Millisecond, tmpfile.Name(), true, 0) require.NoError(t, err) writer := newMockWriter() @@ -281,7 +281,7 @@ func TestFileLogGeneratorMultipleWorkers(t *testing.T) { } tmpfile.Close() - gen, err := New(logger, 3, 10*time.Millisecond, tmpfile.Name()) + gen, err := New(logger, 3, 10*time.Millisecond, tmpfile.Name(), true, 0) require.NoError(t, err) writer := newMockWriter() @@ -308,7 +308,7 @@ func TestTimestampProcessing(t *testing.T) { require.NoError(t, err) tmpFile.Close() - gen, err := New(logger, 1, 100*time.Millisecond, tmpFile.Name()) + gen, err := New(logger, 1, 100*time.Millisecond, tmpFile.Name(), true, 0) require.NoError(t, err) testCases := []struct { @@ -413,7 +413,7 @@ func TestFileLogGeneratorGlobPatterns(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - gen, err := New(logger, 1, 10*time.Millisecond, tc.pattern) + gen, err := New(logger, 1, 10*time.Millisecond, tc.pattern, true, 0) require.NoError(t, err) writer := newMockWriter() @@ -485,7 +485,7 @@ func TestFileLogGeneratorGlobDirectories(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - gen, err := New(logger, 1, 10*time.Millisecond, tc.pattern) + gen, err := New(logger, 1, 10*time.Millisecond, tc.pattern, true, 0) require.NoError(t, err) writer := newMockWriter() @@ -507,3 +507,96 @@ func TestFileLogGeneratorGlobDirectories(t *testing.T) { }) } } + +// TestCacheGetSet tests the Cache Get/Set methods +func TestCacheGetSet(t *testing.T) { + // Test with cache enabled and no TTL + cache, err := NewCache(true, 0, 10) + require.NoError(t, err) + + entry := &CacheEntry{ + lines: []string{"line1", "line2", "line3"}, + cachedAt: time.Now(), + } + + // Test Set + cache.Set("key1", entry) + + // Test Get - should find the entry + retrievedEntry, found := cache.Get("key1") + require.True(t, found, "entry should be found in cache") + require.Equal(t, entry.lines, retrievedEntry.lines) + + // Test Get - non-existent key + _, found = cache.Get("nonexistent") + require.False(t, found, "non-existent key should not be found") +} + +// TestCacheDisabled tests that cache operations are no-ops when disabled +func TestCacheDisabled(t *testing.T) { + cache, err := NewCache(false, 0, 10) + require.NoError(t, err) + + entry := &CacheEntry{ + lines: []string{"line1", "line2"}, + cachedAt: time.Now(), + } + + // Set should be a no-op + cache.Set("key1", entry) + + // Get should always return false + _, found := cache.Get("key1") + require.False(t, found, "Get should return false when cache is disabled") +} + +// TestCacheTTLExpiration tests that entries expire after the TTL +func TestCacheTTLExpiration(t *testing.T) { + // Create cache with 10ms TTL + cache, err := NewCache(true, 10*time.Millisecond, 10) + require.NoError(t, err) + + entry := &CacheEntry{ + lines: []string{"line1", "line2"}, + cachedAt: time.Now(), + } + + cache.Set("key1", entry) + + // Should be found immediately + _, found := cache.Get("key1") + require.True(t, found, "entry should be found before TTL expires") + + // Wait for TTL to expire + time.Sleep(15 * time.Millisecond) + + // Should not be found after TTL + _, found = cache.Get("key1") + require.False(t, found, "entry should not be found after TTL expires") +} + +// TestCacheLRUEviction tests that cache respects max size limit +func TestCacheLRUEviction(t *testing.T) { + // Create cache with max size of 2 + cache, err := NewCache(true, 0, 2) + require.NoError(t, err) + + entry1 := &CacheEntry{lines: []string{"1"}, cachedAt: time.Now()} + entry2 := &CacheEntry{lines: []string{"2"}, cachedAt: time.Now()} + entry3 := &CacheEntry{lines: []string{"3"}, cachedAt: time.Now()} + + cache.Set("key1", entry1) + cache.Set("key2", entry2) + cache.Set("key3", entry3) // This should evict key1 (LRU) + + // key1 should be evicted + _, found := cache.Get("key1") + require.False(t, found, "key1 should be evicted due to LRU") + + // key2 and key3 should still be present + _, found = cache.Get("key2") + require.True(t, found, "key2 should still be in cache") + + _, found = cache.Get("key3") + require.True(t, found, "key3 should still be in cache") +} diff --git a/go.mod b/go.mod index edea24e..f324548 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/go-version v1.7.0 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index d39335d..d829b55 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnV github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= diff --git a/internal/config/generator_file.go b/internal/config/generator_file.go index a7a84aa..b510a8f 100644 --- a/internal/config/generator_file.go +++ b/internal/config/generator_file.go @@ -13,6 +13,10 @@ type FileGeneratorConfig struct { Rate time.Duration `yaml:"rate,omitempty" mapstructure:"rate,omitempty"` // Source is the file path, directory path, or glob pattern (auto-detected) Source string `yaml:"source,omitempty" mapstructure:"source,omitempty"` + // CacheEnabled enables in-memory file caching (default true) + CacheEnabled bool `yaml:"cache-enabled,omitempty" mapstructure:"cache-enabled,omitempty"` + // CacheTTL is the time-to-live for cached file entries (0 = never expire, default = 0) + CacheTTL time.Duration `yaml:"cache-ttl,omitempty" mapstructure:"cache-ttl,omitempty"` } // Validate validates the File generator configuration diff --git a/internal/config/override.go b/internal/config/override.go index 2214f79..1e448df 100644 --- a/internal/config/override.go +++ b/internal/config/override.go @@ -271,6 +271,8 @@ func DefaultOverrides() []*Override { NewOverride("generator.filegen.workers", "number of File generator workers", 1), NewOverride("generator.filegen.rate", "rate at which File logs are generated per worker", 1*time.Second), NewOverride("generator.filegen.source", "file path, directory path, or glob pattern (auto-detected)", ""), + NewOverride("generator.filegen.cache-enabled", "enable in-memory file caching", true), + NewOverride("generator.filegen.cache-ttl", "file cache time-to-live (0 = never expire)", time.Duration(0)), NewOverride("output.type", "output type. One of: nop|stdout|tcp|udp|syslog|otlp-grpc|file", OutputTypeNop), NewOverride("output.udp.host", "UDP output target host", ""), NewOverride("output.udp.port", "UDP output target port", 0), diff --git a/package/completions/blitz.bash b/package/completions/blitz.bash index 4025abc..6827eb4 100644 --- a/package/completions/blitz.bash +++ b/package/completions/blitz.bash @@ -388,6 +388,9 @@ _blitz_help() two_word_flags+=("--generator-apache-error-rate") flags+=("--generator-apache-error-workers=") two_word_flags+=("--generator-apache-error-workers") + flags+=("--generator-filegen-cache-enabled") + flags+=("--generator-filegen-cache-ttl=") + two_word_flags+=("--generator-filegen-cache-ttl") flags+=("--generator-filegen-rate=") two_word_flags+=("--generator-filegen-rate") flags+=("--generator-filegen-source=") @@ -588,6 +591,9 @@ _blitz_version() two_word_flags+=("--generator-apache-error-rate") flags+=("--generator-apache-error-workers=") two_word_flags+=("--generator-apache-error-workers") + flags+=("--generator-filegen-cache-enabled") + flags+=("--generator-filegen-cache-ttl=") + two_word_flags+=("--generator-filegen-cache-ttl") flags+=("--generator-filegen-rate=") two_word_flags+=("--generator-filegen-rate") flags+=("--generator-filegen-source=") @@ -789,6 +795,9 @@ _blitz_root_command() two_word_flags+=("--generator-apache-error-rate") flags+=("--generator-apache-error-workers=") two_word_flags+=("--generator-apache-error-workers") + flags+=("--generator-filegen-cache-enabled") + flags+=("--generator-filegen-cache-ttl=") + two_word_flags+=("--generator-filegen-cache-ttl") flags+=("--generator-filegen-rate=") two_word_flags+=("--generator-filegen-rate") flags+=("--generator-filegen-source=") From b9f494c6fafc586f19d32af192ebda0aa8a9e871 Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 16:34:06 -0500 Subject: [PATCH 6/7] refactor(filegen): simplify cache implementation using golang-lru expirable TTL --- generator/filegen/filegen.go | 54 +++++++++++-------------------- generator/filegen/filegen_test.go | 37 ++++++++------------- internal/config/override_test.go | 28 ++++++++++------ 3 files changed, 51 insertions(+), 68 deletions(-) diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index 201de78..7ed52db 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -13,7 +13,7 @@ import ( "time" "github.com/cenkalti/backoff/v4" - lru "github.com/hashicorp/golang-lru/v2" + "github.com/hashicorp/golang-lru/v2/expirable" "github.com/observiq/blitz/internal/generator/ctime" "github.com/observiq/blitz/output" "go.opentelemetry.io/otel" @@ -22,65 +22,51 @@ import ( "go.uber.org/zap" ) -// CacheEntry holds the cached lines of a file and when it was cached -type CacheEntry struct { - lines []string - cachedAt time.Time -} - // Cache provides thread-safe access to file line caches with optional TTL type Cache struct { - lruCache *lru.Cache[string, *CacheEntry] - ttl time.Duration // 0 means never expire + lruCache *expirable.LRU[string, []string] enabled bool } // NewCache creates a new Cache with the given size limit and TTL +// ttl of 0 means entries never expire func NewCache(enabled bool, ttl time.Duration, maxSize int) (*Cache, error) { if !enabled { return &Cache{enabled: false}, nil } - lruCache, err := lru.New[string, *CacheEntry](maxSize) - if err != nil { - return nil, fmt.Errorf("create LRU cache: %w", err) + // For 0 TTL (never expire), use a very large duration + // expirable.LRU requires a TTL, so we use a large value for "never expire" + effectiveTTL := ttl + if ttl == 0 { + effectiveTTL = 24 * 365 * time.Hour // ~1 year (effectively never) } + lruCache := expirable.NewLRU[string, []string](maxSize, nil, effectiveTTL) + return &Cache{ lruCache: lruCache, - ttl: ttl, enabled: true, }, nil } // Get retrieves a cache entry if it exists and hasn't expired -func (c *Cache) Get(key string) (*CacheEntry, bool) { +func (c *Cache) Get(key string) ([]string, bool) { if !c.enabled { return nil, false } - entry, found := c.lruCache.Get(key) - if !found { - return nil, false - } - - // Check TTL (0 means never expire) - if c.ttl > 0 && time.Since(entry.cachedAt) > c.ttl { - // Entry has expired, remove it - c.lruCache.Remove(key) - return nil, false - } - - return entry, true + lines, found := c.lruCache.Get(key) + return lines, found } // Set stores a cache entry -func (c *Cache) Set(key string, entry *CacheEntry) { +func (c *Cache) Set(key string, lines []string) { if !c.enabled { return } - c.lruCache.Add(key, entry) + c.lruCache.Add(key, lines) } // FileLogGenerator generates log data by reading from files @@ -377,9 +363,8 @@ func (g *FileLogGenerator) worker(id int, writer output.Writer, files []string) func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Writer) error { // Check cache first var lines []string - cacheEntry, found := g.cache.Get(filename) - if found { - lines = cacheEntry.lines + if cachedLines, found := g.cache.Get(filename); found { + lines = cachedLines } else { // Cache miss, read from disk var err error @@ -389,10 +374,7 @@ func (g *FileLogGenerator) readAndWriteFile(filename string, writer output.Write } // Update cache - g.cache.Set(filename, &CacheEntry{ - lines: lines, - cachedAt: time.Now(), - }) + g.cache.Set(filename, lines) } // If no non-empty lines found, return without error diff --git a/generator/filegen/filegen_test.go b/generator/filegen/filegen_test.go index 24a5c2d..6b19728 100644 --- a/generator/filegen/filegen_test.go +++ b/generator/filegen/filegen_test.go @@ -514,18 +514,15 @@ func TestCacheGetSet(t *testing.T) { cache, err := NewCache(true, 0, 10) require.NoError(t, err) - entry := &CacheEntry{ - lines: []string{"line1", "line2", "line3"}, - cachedAt: time.Now(), - } + lines := []string{"line1", "line2", "line3"} // Test Set - cache.Set("key1", entry) + cache.Set("key1", lines) // Test Get - should find the entry - retrievedEntry, found := cache.Get("key1") + retrievedLines, found := cache.Get("key1") require.True(t, found, "entry should be found in cache") - require.Equal(t, entry.lines, retrievedEntry.lines) + require.Equal(t, lines, retrievedLines) // Test Get - non-existent key _, found = cache.Get("nonexistent") @@ -537,13 +534,10 @@ func TestCacheDisabled(t *testing.T) { cache, err := NewCache(false, 0, 10) require.NoError(t, err) - entry := &CacheEntry{ - lines: []string{"line1", "line2"}, - cachedAt: time.Now(), - } + lines := []string{"line1", "line2"} // Set should be a no-op - cache.Set("key1", entry) + cache.Set("key1", lines) // Get should always return false _, found := cache.Get("key1") @@ -556,12 +550,9 @@ func TestCacheTTLExpiration(t *testing.T) { cache, err := NewCache(true, 10*time.Millisecond, 10) require.NoError(t, err) - entry := &CacheEntry{ - lines: []string{"line1", "line2"}, - cachedAt: time.Now(), - } + lines := []string{"line1", "line2"} - cache.Set("key1", entry) + cache.Set("key1", lines) // Should be found immediately _, found := cache.Get("key1") @@ -581,13 +572,13 @@ func TestCacheLRUEviction(t *testing.T) { cache, err := NewCache(true, 0, 2) require.NoError(t, err) - entry1 := &CacheEntry{lines: []string{"1"}, cachedAt: time.Now()} - entry2 := &CacheEntry{lines: []string{"2"}, cachedAt: time.Now()} - entry3 := &CacheEntry{lines: []string{"3"}, cachedAt: time.Now()} + lines1 := []string{"1"} + lines2 := []string{"2"} + lines3 := []string{"3"} - cache.Set("key1", entry1) - cache.Set("key2", entry2) - cache.Set("key3", entry3) // This should evict key1 (LRU) + cache.Set("key1", lines1) + cache.Set("key2", lines2) + cache.Set("key3", lines3) // This should evict key1 (LRU) // key1 should be evicted _, found := cache.Get("key1") diff --git a/internal/config/override_test.go b/internal/config/override_test.go index 926bd13..83c7869 100644 --- a/internal/config/override_test.go +++ b/internal/config/override_test.go @@ -47,6 +47,8 @@ func getTestOverrideFlagsArgs() []string { "--generator-filegen-workers", "20", "--generator-filegen-rate", "50ms", "--generator-filegen-source", "/var/log", + "--generator-filegen-cache-enabled=false", + "--generator-filegen-cache-ttl", "0", "--output-type", "otlp-grpc", "--output-udp-host", "udp.example.com", "--output-udp-port", "1514", @@ -137,6 +139,8 @@ func getTestOverrideEnvs() map[string]string { "BLITZ_GENERATOR_FILEGEN_WORKERS": "21", "BLITZ_GENERATOR_FILEGEN_RATE": "45ms", "BLITZ_GENERATOR_FILEGEN_SOURCE": "syslog_generic", + "BLITZ_GENERATOR_FILEGEN_CACHE_ENABLED": "false", + "BLITZ_GENERATOR_FILEGEN_CACHE_TTL": "0", "BLITZ_OUTPUT_TYPE": "file", "BLITZ_OUTPUT_UDP_HOST": "udp.env.example", "BLITZ_OUTPUT_UDP_PORT": "5514", @@ -263,9 +267,11 @@ func TestOverrideDefaults(t *testing.T) { Format: "cri-o", }, Filegen: FileGeneratorConfig{ - Workers: 1, - Rate: 1 * time.Second, - Source: "", + Workers: 1, + Rate: 1 * time.Second, + Source: "", + CacheEnabled: true, + CacheTTL: 0, }, }, Output: Output{ @@ -414,9 +420,11 @@ func TestOverrideFlags(t *testing.T) { Format: "cri-o", }, Filegen: FileGeneratorConfig{ - Workers: 20, - Rate: 50 * time.Millisecond, - Source: "/var/log", + Workers: 20, + Rate: 50 * time.Millisecond, + Source: "/var/log", + CacheEnabled: false, + CacheTTL: 0, }, }, Output: Output{ @@ -568,9 +576,11 @@ func TestOverrideEnvs(t *testing.T) { Format: "cri-o", }, Filegen: FileGeneratorConfig{ - Workers: 21, - Rate: 45 * time.Millisecond, - Source: "syslog_generic", + Workers: 21, + Rate: 45 * time.Millisecond, + Source: "syslog_generic", + CacheEnabled: false, + CacheTTL: 0, }, }, Output: Output{ From 9b702b1d4ffcbe0ed9bb0be5c1a449c9399bd366 Mon Sep 17 00:00:00 2001 From: Dylan Myers Date: Thu, 29 Jan 2026 16:44:41 -0500 Subject: [PATCH 7/7] Eliminate unneeded lines Co-authored-by: Joseph Sirianni --- generator/filegen/filegen.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/generator/filegen/filegen.go b/generator/filegen/filegen.go index 7ed52db..6d63d40 100644 --- a/generator/filegen/filegen.go +++ b/generator/filegen/filegen.go @@ -56,8 +56,7 @@ func (c *Cache) Get(key string) ([]string, bool) { return nil, false } - lines, found := c.lruCache.Get(key) - return lines, found + return c.lruCache.Get(key) } // Set stores a cache entry