diff --git a/go/prom_proxy/BUILD.bazel b/go/prom_proxy/BUILD.bazel index 3d7de209..e5277d6f 100644 --- a/go/prom_proxy/BUILD.bazel +++ b/go/prom_proxy/BUILD.bazel @@ -4,6 +4,7 @@ load("//bazel/rules:oci.bzl", "linux_oci_go") go_library( name = "prom_proxy_lib", srcs = [ + "cache.go", "handlers.go", "models.go", "prometheus_client.go", @@ -22,6 +23,7 @@ go_test( name = "prom_proxy_test", size = "small", srcs = [ + "cache_test.go", "handlers_test.go", "models_test.go", "prometheus_client_test.go", diff --git a/go/prom_proxy/cache.go b/go/prom_proxy/cache.go new file mode 100644 index 00000000..452d2b62 --- /dev/null +++ b/go/prom_proxy/cache.go @@ -0,0 +1,255 @@ +package prom_proxy + +import ( + "context" + "log" + "sync" + "time" +) + +// CacheEntry represents a cached response with timestamp +type CacheEntry[T any] struct { + Data T + Timestamp time.Time +} + +// MetricsCache holds all cached responses +type MetricsCache struct { + mu sync.RWMutex + + // Point-in-time metrics + systemMetrics *CacheEntry[*SystemMetrics] + portraitMetrics *CacheEntry[*PortraitMetrics] + + // Timeseries data for each range + systemTimeseries map[TimeRange]*CacheEntry[*TimeSeriesResponse] + portraitTimeseries map[TimeRange]*CacheEntry[*TimeSeriesResponse] + + // Background refresh control + ctx context.Context + cancel context.CancelFunc + promClient PrometheusQuerier + refreshInterval time.Duration +} + +// NewMetricsCache creates a new cache instance and starts background refresh +func NewMetricsCache(promClient PrometheusQuerier, refreshInterval time.Duration) *MetricsCache { + ctx, cancel := context.WithCancel(context.Background()) + + cache := &MetricsCache{ + systemTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + portraitTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + ctx: ctx, + cancel: cancel, + promClient: promClient, + refreshInterval: refreshInterval, + } + + // Initialize timeseries maps for all valid time ranges + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + cache.systemTimeseries[tr] = nil + cache.portraitTimeseries[tr] = nil + } + + // Start background refresh goroutine + go cache.refreshLoop() + + return cache +} + +// Close stops the background refresh goroutine +func (c *MetricsCache) Close() { + c.cancel() +} + +// refreshLoop runs background cache updates +func (c *MetricsCache) refreshLoop() { + // Initial population + c.refreshAll() + + ticker := time.NewTicker(c.refreshInterval) + defer ticker.Stop() + + for { + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + c.refreshAll() + } + } +} + +// refreshAll updates all cache entries +func (c *MetricsCache) refreshAll() { + log.Printf("Refreshing metrics cache...") + start := time.Now() + + // Create a context with timeout for all operations + ctx, cancel := context.WithTimeout(c.ctx, 30*time.Second) + defer cancel() + + // Create a temporary handler to use existing logic + handler := &MetricsHandler{promClient: c.promClient} + + // Refresh point-in-time metrics concurrently + var wg sync.WaitGroup + + // System metrics + wg.Add(1) + go func() { + defer wg.Done() + if systemMetrics, err := handler.fetchSystemMetrics(ctx); err == nil { + c.setSystemMetrics(systemMetrics) + } else { + log.Printf("Failed to refresh system metrics: %v", err) + } + }() + + // Portrait metrics + wg.Add(1) + go func() { + defer wg.Done() + if portraitMetrics, err := handler.fetchPortraitMetrics(ctx); err == nil { + c.setPortraitMetrics(portraitMetrics) + } else { + log.Printf("Failed to refresh portrait metrics: %v", err) + } + }() + + // System timeseries for each range + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + wg.Add(1) + go func(timeRange TimeRange) { + defer wg.Done() + if tsResponse, err := handler.fetchSystemMetricsTimeSeries(ctx, timeRange); err == nil { + c.setSystemTimeseries(timeRange, tsResponse) + } else { + log.Printf("Failed to refresh system timeseries for %s: %v", timeRange, err) + } + }(tr) + } + + // Portrait timeseries for each range + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + wg.Add(1) + go func(timeRange TimeRange) { + defer wg.Done() + if tsResponse, err := handler.fetchPortraitMetricsTimeSeries(ctx, timeRange); err == nil { + c.setPortraitTimeseries(timeRange, tsResponse) + } else { + log.Printf("Failed to refresh portrait timeseries for %s: %v", timeRange, err) + } + }(tr) + } + + wg.Wait() + log.Printf("Cache refresh completed in %v", time.Since(start)) +} + +// Getters with read locks +func (c *MetricsCache) GetSystemMetrics() *SystemMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + if c.systemMetrics != nil { + return c.systemMetrics.Data + } + return nil +} + +func (c *MetricsCache) GetPortraitMetrics() *PortraitMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + if c.portraitMetrics != nil { + return c.portraitMetrics.Data + } + return nil +} + +func (c *MetricsCache) GetSystemTimeseries(timeRange TimeRange) *TimeSeriesResponse { + c.mu.RLock() + defer c.mu.RUnlock() + if entry := c.systemTimeseries[timeRange]; entry != nil { + return entry.Data + } + return nil +} + +func (c *MetricsCache) GetPortraitTimeseries(timeRange TimeRange) *TimeSeriesResponse { + c.mu.RLock() + defer c.mu.RUnlock() + if entry := c.portraitTimeseries[timeRange]; entry != nil { + return entry.Data + } + return nil +} + +// Setters with write locks +func (c *MetricsCache) setSystemMetrics(data *SystemMetrics) { + c.mu.Lock() + defer c.mu.Unlock() + c.systemMetrics = &CacheEntry[*SystemMetrics]{ + Data: data, + Timestamp: time.Now(), + } +} + +func (c *MetricsCache) setPortraitMetrics(data *PortraitMetrics) { + c.mu.Lock() + defer c.mu.Unlock() + c.portraitMetrics = &CacheEntry[*PortraitMetrics]{ + Data: data, + Timestamp: time.Now(), + } +} + +func (c *MetricsCache) setSystemTimeseries(timeRange TimeRange, data *TimeSeriesResponse) { + c.mu.Lock() + defer c.mu.Unlock() + c.systemTimeseries[timeRange] = &CacheEntry[*TimeSeriesResponse]{ + Data: data, + Timestamp: time.Now(), + } +} + +func (c *MetricsCache) setPortraitTimeseries(timeRange TimeRange, data *TimeSeriesResponse) { + c.mu.Lock() + defer c.mu.Unlock() + c.portraitTimeseries[timeRange] = &CacheEntry[*TimeSeriesResponse]{ + Data: data, + Timestamp: time.Now(), + } +} + +// GetCacheInfo returns information about cache status +func (c *MetricsCache) GetCacheInfo() map[string]interface{} { + c.mu.RLock() + defer c.mu.RUnlock() + + info := make(map[string]interface{}) + + if c.systemMetrics != nil { + info["system_metrics_cached_at"] = c.systemMetrics.Timestamp + } + if c.portraitMetrics != nil { + info["portrait_metrics_cached_at"] = c.portraitMetrics.Timestamp + } + + systemTS := make(map[string]interface{}) + for tr, entry := range c.systemTimeseries { + if entry != nil { + systemTS[string(tr)] = entry.Timestamp + } + } + info["system_timeseries"] = systemTS + + portraitTS := make(map[string]interface{}) + for tr, entry := range c.portraitTimeseries { + if entry != nil { + portraitTS[string(tr)] = entry.Timestamp + } + } + info["portrait_timeseries"] = portraitTS + + return info +} \ No newline at end of file diff --git a/go/prom_proxy/cache_test.go b/go/prom_proxy/cache_test.go new file mode 100644 index 00000000..e72f4153 --- /dev/null +++ b/go/prom_proxy/cache_test.go @@ -0,0 +1,329 @@ +package prom_proxy + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +// Mock Prometheus client for cache testing +type mockCachePromClient struct { + mu sync.Mutex + systemMetricsCallCount int + portraitMetricsCallCount int + systemTimeSeriesCallCount map[TimeRange]int + portraitTimeSeriesCallCount map[TimeRange]int + queryResponse *QueryResponse + queryRangeResponse *QueryResponse + queryError error + queryRangeError error +} + +func newMockCachePromClient() *mockCachePromClient { + return &mockCachePromClient{ + systemTimeSeriesCallCount: make(map[TimeRange]int), + portraitTimeSeriesCallCount: make(map[TimeRange]int), + queryResponse: &QueryResponse{ + Status: "success", + Data: struct { + ResultType string `json:"resultType"` + Result []Result `json:"result"` + }{ + ResultType: "vector", + Result: []Result{ + { + Metric: map[string]string{"__name__": "test_metric"}, + Value: []interface{}{float64(time.Now().Unix()), "42.5"}, + }, + }, + }, + }, + queryRangeResponse: &QueryResponse{ + Status: "success", + Data: struct { + ResultType string `json:"resultType"` + Result []Result `json:"result"` + }{ + ResultType: "matrix", + Result: []Result{ + { + Metric: map[string]string{"__name__": "test_metric"}, + Values: [][]interface{}{ + {float64(time.Now().Unix()), "25.5"}, + {float64(time.Now().Unix() + 30), "26.1"}, + }, + }, + }, + }, + }, + } +} + +func (m *mockCachePromClient) Query(ctx context.Context, query string) (*QueryResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Count calls for different metric types based on query patterns + if query == `100-avg(rate(system_cpu_time_seconds_total{state="idle"}[5m]))*100` { + m.systemMetricsCallCount++ + } else if query == `trace_requests_total` { + m.portraitMetricsCallCount++ + } + + if m.queryError != nil { + return nil, m.queryError + } + return m.queryResponse, nil +} + +func (m *mockCachePromClient) QueryRange(ctx context.Context, query string, start, end time.Time, step string) (*QueryResponse, error) { + m.mu.Lock() + defer m.mu.Unlock() + + // Determine the step to identify the time range + var timeRange TimeRange + switch step { + case "30s": + timeRange = Last30Minutes + case "5m": + timeRange = LastDay + case "1h": + timeRange = LastWeek + } + + // Count calls for different metric types and ranges + if query == `100-avg(rate(system_cpu_time_seconds_total{state="idle"}[5m]))*100` { + m.systemTimeSeriesCallCount[timeRange]++ + } else if query == `rate(trace_requests_total[5m])` { + m.portraitTimeSeriesCallCount[timeRange]++ + } + + if m.queryRangeError != nil { + return nil, m.queryRangeError + } + return m.queryRangeResponse, nil +} + +func (m *mockCachePromClient) getCallCounts() (int, int, map[TimeRange]int, map[TimeRange]int) { + m.mu.Lock() + defer m.mu.Unlock() + + return m.systemMetricsCallCount, m.portraitMetricsCallCount, + m.systemTimeSeriesCallCount, m.portraitTimeSeriesCallCount +} + +func TestNewMetricsCache(t *testing.T) { + mockClient := newMockCachePromClient() + cache := NewMetricsCache(mockClient, 100*time.Millisecond) + defer cache.Close() + + assert.NotNil(t, cache) + assert.Equal(t, mockClient, cache.promClient) + assert.Equal(t, 100*time.Millisecond, cache.refreshInterval) + + // Verify cache maps are initialized + assert.NotNil(t, cache.systemTimeseries) + assert.NotNil(t, cache.portraitTimeseries) + + // Verify all time ranges are initialized + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + _, exists := cache.systemTimeseries[tr] + assert.True(t, exists, "System timeseries should have entry for %s", tr) + _, exists = cache.portraitTimeseries[tr] + assert.True(t, exists, "Portrait timeseries should have entry for %s", tr) + } +} + +func TestMetricsCache_InitialPopulation(t *testing.T) { + mockClient := newMockCachePromClient() + cache := NewMetricsCache(mockClient, 100*time.Millisecond) + defer cache.Close() + + // Wait for initial population + time.Sleep(200 * time.Millisecond) + + // Verify cache is populated + systemMetrics := cache.GetSystemMetrics() + assert.NotNil(t, systemMetrics, "System metrics should be cached") + + portraitMetrics := cache.GetPortraitMetrics() + assert.NotNil(t, portraitMetrics, "Portrait metrics should be cached") + + // Verify timeseries are populated for all ranges + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + systemTS := cache.GetSystemTimeseries(tr) + assert.NotNil(t, systemTS, "System timeseries for %s should be cached", tr) + + portraitTS := cache.GetPortraitTimeseries(tr) + assert.NotNil(t, portraitTS, "Portrait timeseries for %s should be cached", tr) + } +} + +func TestMetricsCache_PeriodicRefresh(t *testing.T) { + mockClient := newMockCachePromClient() + refreshInterval := 50 * time.Millisecond + cache := NewMetricsCache(mockClient, refreshInterval) + defer cache.Close() + + // Wait for multiple refresh cycles + time.Sleep(150 * time.Millisecond) + + // Check that refresh was called multiple times + systemCalls, portraitCalls, _, _ := mockClient.getCallCounts() + + // Should have at least 2 calls (initial + 1 refresh) + assert.GreaterOrEqual(t, systemCalls, 2, "System metrics should be refreshed multiple times") + assert.GreaterOrEqual(t, portraitCalls, 2, "Portrait metrics should be refreshed multiple times") +} + +func TestMetricsCache_ConcurrentAccess(t *testing.T) { + mockClient := newMockCachePromClient() + cache := NewMetricsCache(mockClient, 100*time.Millisecond) + defer cache.Close() + + // Wait for initial population + time.Sleep(50 * time.Millisecond) + + // Perform concurrent reads and writes + var wg sync.WaitGroup + numReaders := 10 + numOps := 50 + + // Start concurrent readers + for i := 0; i < numReaders; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < numOps; j++ { + _ = cache.GetSystemMetrics() + _ = cache.GetPortraitMetrics() + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + _ = cache.GetSystemTimeseries(tr) + _ = cache.GetPortraitTimeseries(tr) + } + time.Sleep(time.Millisecond) + } + }() + } + + wg.Wait() + + // Ensure no panics occurred and cache is still functional + assert.NotNil(t, cache.GetSystemMetrics()) + assert.NotNil(t, cache.GetPortraitMetrics()) +} + +func TestMetricsCache_GetCacheInfo(t *testing.T) { + mockClient := newMockCachePromClient() + cache := NewMetricsCache(mockClient, 100*time.Millisecond) + defer cache.Close() + + // Wait for initial population + time.Sleep(50 * time.Millisecond) + + info := cache.GetCacheInfo() + + assert.Contains(t, info, "system_metrics_cached_at") + assert.Contains(t, info, "portrait_metrics_cached_at") + assert.Contains(t, info, "system_timeseries") + assert.Contains(t, info, "portrait_timeseries") + + // Verify timestamps are recent + if systemTime, ok := info["system_metrics_cached_at"].(time.Time); ok { + assert.WithinDuration(t, time.Now(), systemTime, 5*time.Second) + } + + if portraitTime, ok := info["portrait_metrics_cached_at"].(time.Time); ok { + assert.WithinDuration(t, time.Now(), portraitTime, 5*time.Second) + } + + // Verify timeseries info structure + if systemTS, ok := info["system_timeseries"].(map[string]interface{}); ok { + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + assert.Contains(t, systemTS, string(tr)) + } + } else { + t.Error("system_timeseries should be a map[string]interface{}") + } +} + +func TestMetricsCache_EmptyCache(t *testing.T) { + // Test cache behavior when no data has been populated yet + mockClient := newMockCachePromClient() + + // Create cache but don't wait for population + cache := &MetricsCache{ + systemTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + portraitTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + promClient: mockClient, + } + + // All getters should return nil for empty cache + assert.Nil(t, cache.GetSystemMetrics()) + assert.Nil(t, cache.GetPortraitMetrics()) + + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + assert.Nil(t, cache.GetSystemTimeseries(tr)) + assert.Nil(t, cache.GetPortraitTimeseries(tr)) + } +} + +func TestMetricsCache_RefreshFailure(t *testing.T) { + mockClient := newMockCachePromClient() + mockClient.queryError = assert.AnError + mockClient.queryRangeError = assert.AnError + + cache := NewMetricsCache(mockClient, 50*time.Millisecond) + defer cache.Close() + + // Wait for refresh attempts + time.Sleep(100 * time.Millisecond) + + // Cache should handle errors gracefully and not crash + // When queries fail, empty structs are created rather than nil + systemMetrics := cache.GetSystemMetrics() + assert.NotNil(t, systemMetrics, "System metrics should not be nil even with errors") + assert.Equal(t, 0.0, systemMetrics.CPU.Utilization, "CPU utilization should be 0 on error") + + portraitMetrics := cache.GetPortraitMetrics() + assert.NotNil(t, portraitMetrics, "Portrait metrics should not be nil even with errors") + assert.Equal(t, 0.0, portraitMetrics.Requests.Total, "Request total should be 0 on error") + + // Timeseries should be available but empty + for _, tr := range []TimeRange{Last30Minutes, LastDay, LastWeek} { + systemTS := cache.GetSystemTimeseries(tr) + assert.NotNil(t, systemTS, "System timeseries for %s should not be nil", tr) + assert.Empty(t, systemTS.Series, "System timeseries should be empty on error") + + portraitTS := cache.GetPortraitTimeseries(tr) + assert.NotNil(t, portraitTS, "Portrait timeseries for %s should not be nil", tr) + assert.Empty(t, portraitTS.Series, "Portrait timeseries should be empty on error") + } +} + +func TestMetricsCache_Close(t *testing.T) { + mockClient := newMockCachePromClient() + cache := NewMetricsCache(mockClient, 20*time.Millisecond) + + // Let it run for a bit to ensure it's running + time.Sleep(100 * time.Millisecond) + + // Close the cache + cache.Close() + + // Get call count just after close + initialSystemCalls, initialPortraitCalls, _, _ := mockClient.getCallCounts() + + // Wait some more time - calls should not increase after close + time.Sleep(100 * time.Millisecond) + + finalSystemCalls, finalPortraitCalls, _, _ := mockClient.getCallCounts() + + // Call counts should not increase significantly after close + // (allow for at most 1 more call that might have been in progress) + assert.LessOrEqual(t, finalSystemCalls-initialSystemCalls, 1, "No significant new system calls after close") + assert.LessOrEqual(t, finalPortraitCalls-initialPortraitCalls, 1, "No significant new portrait calls after close") +} \ No newline at end of file diff --git a/go/prom_proxy/handlers.go b/go/prom_proxy/handlers.go index bcde39d3..8d7a230e 100644 --- a/go/prom_proxy/handlers.go +++ b/go/prom_proxy/handlers.go @@ -93,11 +93,13 @@ type PrometheusQuerier interface { type MetricsHandler struct { promClient PrometheusQuerier + cache *MetricsCache } -func NewMetricsHandler(promClient PrometheusQuerier) *MetricsHandler { +func NewMetricsHandler(promClient PrometheusQuerier, cache *MetricsCache) *MetricsHandler { return &MetricsHandler{ promClient: promClient, + cache: cache, } } @@ -110,14 +112,20 @@ func (h *MetricsHandler) HealthHandler(w http.ResponseWriter, r *http.Request) { mucks.JsonOk(w, response) } -func (h *MetricsHandler) GetSystemMetrics(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() +func (h *MetricsHandler) CacheStatusHandler(w http.ResponseWriter, r *http.Request) { + cacheInfo := h.cache.GetCacheInfo() + cacheInfo["status"] = "cache-info" + cacheInfo["service"] = "prometheus-proxy-cache" + cacheInfo["timestamp"] = time.Now().UTC().Format(time.RFC3339) - metrics, err := h.fetchSystemMetrics(ctx) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch system metrics: " + err.Error() + mucks.JsonOk(w, cacheInfo) +} + +func (h *MetricsHandler) GetSystemMetrics(w http.ResponseWriter, r *http.Request) { + metrics := h.cache.GetSystemMetrics() + if metrics == nil { + problem := mucks.NewServerError(503) + problem.Detail = "System metrics not available in cache" mucks.JsonError(w, problem) return } @@ -126,13 +134,10 @@ func (h *MetricsHandler) GetSystemMetrics(w http.ResponseWriter, r *http.Request } func (h *MetricsHandler) GetPortraitMetrics(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - - metrics, err := h.fetchPortraitMetrics(ctx) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch portrait metrics: " + err.Error() + metrics := h.cache.GetPortraitMetrics() + if metrics == nil { + problem := mucks.NewServerError(503) + problem.Detail = "Portrait metrics not available in cache" mucks.JsonError(w, problem) return } @@ -141,21 +146,12 @@ func (h *MetricsHandler) GetPortraitMetrics(w http.ResponseWriter, r *http.Reque } func (h *MetricsHandler) GetSummaryMetrics(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() + systemMetrics := h.cache.GetSystemMetrics() + portraitMetrics := h.cache.GetPortraitMetrics() - systemMetrics, err := h.fetchSystemMetrics(ctx) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch system metrics: " + err.Error() - mucks.JsonError(w, problem) - return - } - - portraitMetrics, err := h.fetchPortraitMetrics(ctx) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch portrait metrics: " + err.Error() + if systemMetrics == nil || portraitMetrics == nil { + problem := mucks.NewServerError(503) + problem.Detail = "Metrics not available in cache" mucks.JsonError(w, problem) return } @@ -178,13 +174,10 @@ func (h *MetricsHandler) GetSystemMetricsTimeSeries(w http.ResponseWriter, r *ht return } - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - - response, err := h.fetchSystemMetricsTimeSeries(ctx, TimeRange(timeRange)) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch system metrics timeseries: " + err.Error() + response := h.cache.GetSystemTimeseries(TimeRange(timeRange)) + if response == nil { + problem := mucks.NewServerError(503) + problem.Detail = "System timeseries not available in cache" mucks.JsonError(w, problem) return } @@ -201,13 +194,10 @@ func (h *MetricsHandler) GetPortraitMetricsTimeSeries(w http.ResponseWriter, r * return } - ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) - defer cancel() - - response, err := h.fetchPortraitMetricsTimeSeries(ctx, TimeRange(timeRange)) - if err != nil { - problem := mucks.NewServerError(500) - problem.Detail = "Failed to fetch portrait metrics timeseries: " + err.Error() + response := h.cache.GetPortraitTimeseries(TimeRange(timeRange)) + if response == nil { + problem := mucks.NewServerError(503) + problem.Detail = "Portrait timeseries not available in cache" mucks.JsonError(w, problem) return } diff --git a/go/prom_proxy/handlers_test.go b/go/prom_proxy/handlers_test.go index 71b9f90c..99408170 100644 --- a/go/prom_proxy/handlers_test.go +++ b/go/prom_proxy/handlers_test.go @@ -222,7 +222,9 @@ func TestExtractTimeSeries(t *testing.T) { } func TestMetricsHandler_HealthHandler(t *testing.T) { - handler := &MetricsHandler{} + mockClient := &mockPrometheusClient{} + cache := &MetricsCache{} + handler := &MetricsHandler{promClient: mockClient, cache: cache} req := httptest.NewRequest("GET", "/health", nil) w := httptest.NewRecorder() @@ -246,7 +248,9 @@ func TestMetricsHandler_HealthHandler(t *testing.T) { } func TestMetricsHandler_GetSystemMetricsTimeSeries_InvalidRange(t *testing.T) { - handler := &MetricsHandler{} + mockClient := &mockPrometheusClient{} + cache := &MetricsCache{} + handler := &MetricsHandler{promClient: mockClient, cache: cache} // Create a request with invalid range req := httptest.NewRequest("GET", "/api/v1/timeseries/system/invalid", nil) @@ -267,7 +271,9 @@ func TestMetricsHandler_GetSystemMetricsTimeSeries_InvalidRange(t *testing.T) { } func TestMetricsHandler_GetPortraitMetricsTimeSeries_InvalidRange(t *testing.T) { - handler := &MetricsHandler{} + mockClient := &mockPrometheusClient{} + cache := &MetricsCache{} + handler := &MetricsHandler{promClient: mockClient, cache: cache} // Create a request with invalid range req := httptest.NewRequest("GET", "/api/v1/timeseries/portrait/2h", nil) @@ -304,26 +310,29 @@ func (m *mockPrometheusClient) QueryRange(ctx context.Context, query string, sta } func TestMetricsHandler_GetSystemMetrics_Success(t *testing.T) { - mockClient := &mockPrometheusClient{ - queryResponse: &QueryResponse{ - Status: "success", - Data: struct { - ResultType string `json:"resultType"` - Result []Result `json:"result"` - }{ - ResultType: "vector", - Result: []Result{ - { - Metric: map[string]string{"__name__": "test_metric"}, - Value: []interface{}{1609459200.0, "42.5"}, - }, + mockClient := &mockPrometheusClient{} + + // Create cache with test data + cache := &MetricsCache{ + systemMetrics: &CacheEntry[*SystemMetrics]{ + Data: &SystemMetrics{ + Timestamp: time.Now().UTC(), + CPU: CPUMetrics{ + Utilization: 42.5, + ByCore: map[string]float64{"0": 40.0, "1": 45.0}, + }, + Memory: MemoryMetrics{ + Total: 8589934592, // 8GB + Used: 4294967296, // 4GB + Free: 4294967296, // 4GB + Utilization: 50.0, }, }, + Timestamp: time.Now(), }, - queryError: nil, } - handler := &MetricsHandler{promClient: mockClient} + handler := &MetricsHandler{promClient: mockClient, cache: cache} req := httptest.NewRequest("GET", "/api/v1/metrics/system", nil) w := httptest.NewRecorder() @@ -337,61 +346,87 @@ func TestMetricsHandler_GetSystemMetrics_Success(t *testing.T) { err := json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) - // Verify timestamp is recent + // Verify data from cache + assert.Equal(t, 42.5, response.CPU.Utilization) + assert.Equal(t, 50.0, response.Memory.Utilization) assert.WithinDuration(t, time.Now(), response.Timestamp, 5*time.Second) } +func TestMetricsHandler_GetSystemMetrics_CacheEmpty(t *testing.T) { + mockClient := &mockPrometheusClient{} + cache := &MetricsCache{} // Empty cache + + handler := &MetricsHandler{promClient: mockClient, cache: cache} + + req := httptest.NewRequest("GET", "/api/v1/metrics/system", nil) + w := httptest.NewRecorder() + + handler.GetSystemMetrics(w, req) + + assert.Equal(t, http.StatusInternalServerError, w.Code) + assert.Equal(t, "application/json; charset=utf-8", w.Header().Get("Content-Type")) + + var response map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, float64(500), response["status"]) + assert.Contains(t, response["detail"], "not available in cache") +} + func TestMetricsHandler_GetSystemMetrics_PrometheusError(t *testing.T) { mockClient := &mockPrometheusClient{ queryError: assert.AnError, } + cache := &MetricsCache{} // Empty cache to simulate unavailable data - handler := &MetricsHandler{promClient: mockClient} + handler := &MetricsHandler{promClient: mockClient, cache: cache} req := httptest.NewRequest("GET", "/api/v1/metrics/system", nil) w := httptest.NewRecorder() handler.GetSystemMetrics(w, req) - // The handler doesn't fail when individual queries fail, it returns empty metrics - // This is by design to be resilient to partial Prometheus failures - assert.Equal(t, http.StatusOK, w.Code) + // Should return 503 when cache is empty + assert.Equal(t, http.StatusInternalServerError, w.Code) assert.Equal(t, "application/json; charset=utf-8", w.Header().Get("Content-Type")) - var response SystemMetrics + var response map[string]interface{} err := json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) - // All metric values should be zero due to query failures - assert.Equal(t, 0.0, response.CPU.Utilization) - assert.Empty(t, response.CPU.ByCore) - assert.Equal(t, 0.0, response.Memory.Total) + assert.Equal(t, float64(500), response["status"]) + assert.Contains(t, response["detail"], "not available in cache") } func TestMetricsHandler_GetSystemMetricsTimeSeries_Success(t *testing.T) { - mockClient := &mockPrometheusClient{ - queryRangeResponse: &QueryResponse{ - Status: "success", - Data: struct { - ResultType string `json:"resultType"` - Result []Result `json:"result"` - }{ - ResultType: "matrix", - Result: []Result{ - { - Metric: map[string]string{"__name__": "cpu_utilization"}, - Values: [][]interface{}{ - {1609459200.0, "25.5"}, - {1609459230.0, "26.1"}, - }, + mockClient := &mockPrometheusClient{} + + // Create cache with test timeseries data + cache := &MetricsCache{ + systemTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + } + cache.systemTimeseries[Last30Minutes] = &CacheEntry[*TimeSeriesResponse]{ + Data: &TimeSeriesResponse{ + TimeRange: "30m", + Step: "30s", + StartTime: time.Now().Add(-30 * time.Minute), + EndTime: time.Now(), + Series: []TimeSeries{ + { + MetricName: "cpu_utilization", + Labels: map[string]string{"instance": "localhost"}, + Values: []DataPoint{ + {Timestamp: time.Now().Add(-20 * time.Minute), Value: 25.5}, + {Timestamp: time.Now().Add(-10 * time.Minute), Value: 26.1}, }, }, }, }, - queryRangeError: nil, + Timestamp: time.Now(), } - handler := &MetricsHandler{promClient: mockClient} + handler := &MetricsHandler{promClient: mockClient, cache: cache} req := httptest.NewRequest("GET", "/api/v1/timeseries/system/30m", nil) req.SetPathValue("range", "30m") @@ -409,24 +444,34 @@ func TestMetricsHandler_GetSystemMetricsTimeSeries_Success(t *testing.T) { assert.Equal(t, "30m", response.TimeRange) assert.Equal(t, "30s", response.Step) assert.NotEmpty(t, response.Series) + assert.Equal(t, "cpu_utilization", response.Series[0].MetricName) + assert.Len(t, response.Series[0].Values, 2) } func TestMetricsHandler_GetSummaryMetrics(t *testing.T) { - mockClient := &mockPrometheusClient{ - queryResponse: &QueryResponse{ - Status: "success", - Data: struct { - ResultType string `json:"resultType"` - Result []Result `json:"result"` - }{ - ResultType: "vector", - Result: []Result{}, + mockClient := &mockPrometheusClient{} + + // Create cache with both system and portrait metrics + cache := &MetricsCache{ + systemMetrics: &CacheEntry[*SystemMetrics]{ + Data: &SystemMetrics{ + Timestamp: time.Now().UTC(), + CPU: CPUMetrics{Utilization: 42.5}, + Memory: MemoryMetrics{Utilization: 50.0}, }, + Timestamp: time.Now(), + }, + portraitMetrics: &CacheEntry[*PortraitMetrics]{ + Data: &PortraitMetrics{ + Timestamp: time.Now().UTC(), + Requests: RequestMetrics{Total: 1000, Rate: 10.5}, + Cache: CacheMetrics{HitRate: 95.2}, + }, + Timestamp: time.Now(), }, - queryError: nil, } - handler := &MetricsHandler{promClient: mockClient} + handler := &MetricsHandler{promClient: mockClient, cache: cache} req := httptest.NewRequest("GET", "/api/v1/metrics/summary", nil) w := httptest.NewRecorder() @@ -443,15 +488,72 @@ func TestMetricsHandler_GetSummaryMetrics(t *testing.T) { assert.Contains(t, response, "timestamp") assert.Contains(t, response, "system") assert.Contains(t, response, "portrait") + + // Verify the nested structure + system := response["system"].(map[string]interface{}) + cpu := system["cpu"].(map[string]interface{}) + assert.Equal(t, 42.5, cpu["utilization_percent"]) + + portrait := response["portrait"].(map[string]interface{}) + requests := portrait["requests"].(map[string]interface{}) + assert.Equal(t, float64(1000), requests["total"]) +} + +func TestMetricsHandler_CacheStatusHandler(t *testing.T) { + mockClient := &mockPrometheusClient{} + + // Create cache with test data + cache := &MetricsCache{ + systemMetrics: &CacheEntry[*SystemMetrics]{ + Data: &SystemMetrics{}, + Timestamp: time.Now().Add(-5 * time.Minute), + }, + portraitMetrics: &CacheEntry[*PortraitMetrics]{ + Data: &PortraitMetrics{}, + Timestamp: time.Now().Add(-3 * time.Minute), + }, + systemTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + portraitTimeseries: make(map[TimeRange]*CacheEntry[*TimeSeriesResponse]), + } + + // Add some timeseries data + cache.systemTimeseries[Last30Minutes] = &CacheEntry[*TimeSeriesResponse]{ + Data: &TimeSeriesResponse{}, + Timestamp: time.Now().Add(-2 * time.Minute), + } + + handler := &MetricsHandler{promClient: mockClient, cache: cache} + + req := httptest.NewRequest("GET", "/cache/status", nil) + w := httptest.NewRecorder() + + handler.CacheStatusHandler(w, req) + + assert.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "application/json; charset=utf-8", w.Header().Get("Content-Type")) + + var response map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &response) + require.NoError(t, err) + + assert.Equal(t, "cache-info", response["status"]) + assert.Equal(t, "prometheus-proxy-cache", response["service"]) + assert.Contains(t, response, "timestamp") + assert.Contains(t, response, "system_metrics_cached_at") + assert.Contains(t, response, "portrait_metrics_cached_at") + assert.Contains(t, response, "system_timeseries") + assert.Contains(t, response, "portrait_timeseries") } func TestNewMetricsHandler(t *testing.T) { mockClient := &mockPrometheusClient{} + cache := &MetricsCache{} - handler := NewMetricsHandler(mockClient) + handler := NewMetricsHandler(mockClient, cache) assert.NotNil(t, handler) assert.Equal(t, mockClient, handler.promClient) + assert.Equal(t, cache, handler.cache) } // Interface check to ensure our mock implements the right interface diff --git a/go/prom_proxy/main.go b/go/prom_proxy/main.go index 0d6d3847..ccf0ad5a 100644 --- a/go/prom_proxy/main.go +++ b/go/prom_proxy/main.go @@ -4,6 +4,7 @@ import ( "log" "net/http" "os" + "time" "github.com/muchq/moonbase/go/mucks" prom_proxy "github.com/muchq/moonbase/go/prom_proxy_lib" @@ -20,6 +21,13 @@ func main() { // Configuration from environment variables port := getEnvWithDefault("PORT", "8080") prometheusURL := getEnvWithDefault("PROMETHEUS_URL", "http://localhost:9090") + refreshInterval := getEnvWithDefault("CACHE_REFRESH_INTERVAL", "1m") + + // Parse refresh interval + interval, err := time.ParseDuration(refreshInterval) + if err != nil { + log.Fatalf("Invalid cache refresh interval: %v", err) + } // Initialize Prometheus client promClient, err := prom_proxy.NewPrometheusClient(prometheusURL) @@ -27,14 +35,21 @@ func main() { log.Fatalf("Failed to create Prometheus client: %v", err) } - // Initialize handlers - metricsHandler := prom_proxy.NewMetricsHandler(promClient) + // Initialize cache with 1-minute refresh interval + cache := prom_proxy.NewMetricsCache(promClient, interval) + defer cache.Close() + + // Initialize handlers with cache + metricsHandler := prom_proxy.NewMetricsHandler(promClient, cache) // Setup router with JSON middleware router := mucks.NewJsonMucks() // Health endpoint router.HandleFunc("GET /health", metricsHandler.HealthHandler) + + // Cache status endpoint + router.HandleFunc("GET /cache/status", metricsHandler.CacheStatusHandler) // Metrics endpoints - current point-in-time data router.HandleFunc("GET /v1/metrics/system", metricsHandler.GetSystemMetrics) @@ -47,5 +62,6 @@ func main() { log.Printf("Starting Prometheus proxy server on port %s", port) log.Printf("Prometheus backend: %s", prometheusURL) + log.Printf("Cache refresh interval: %s", interval) log.Fatal(http.ListenAndServe(":"+port, router)) }