diff --git a/ai/clickhouse.go b/ai/clickhouse.go new file mode 100644 index 0000000..87be645 --- /dev/null +++ b/ai/clickhouse.go @@ -0,0 +1,117 @@ +package ai + +import ( + "context" + "crypto/tls" + "fmt" + "math" + "strings" + + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/Masterminds/squirrel" +) + +const maxClickhouseResultRows = 1000 + +type AIStreamStatusEventRow struct { + StreamID string `ch:"stream_id"` + AvgInputFPS float64 `ch:"avg_input_fps"` + AvgOutputFPS float64 `ch:"avg_output_fps"` + ErrorCount uint64 `ch:"error_count"` + Errors []string `ch:"errors"` + TotalRestarts uint64 `ch:"total_restarts"` + RestartLogs []string `ch:"restart_logs"` +} + +type Clickhouse interface { + QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) +} + +type ClickhouseOptions struct { + Addr string + User string + Password string + Database string +} + +type ClickhouseClient struct { + conn driver.Conn +} + +func NewClickhouseConn(opts ClickhouseOptions) (*ClickhouseClient, error) { + conn, err := clickhouse.Open(&clickhouse.Options{ + Addr: strings.Split(opts.Addr, ","), + Auth: clickhouse.Auth{ + Database: opts.Database, + Username: opts.User, + Password: opts.Password, + }, + TLS: &tls.Config{}, + }) + if err != nil { + return nil, err + } + return &ClickhouseClient{conn: conn}, nil +} + +func (c *ClickhouseClient) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) { + sql, args, err := buildAIStreamStatusEventsQuery(spec) + if err != nil { + return nil, fmt.Errorf("error building AI stream status events query: %w", err) + } + var res []AIStreamStatusEventRow + err = c.conn.Select(ctx, &res, sql, args...) + if err != nil { + return nil, err + } + res = replaceNaN(res) + return res, nil +} + +func buildAIStreamStatusEventsQuery(spec QuerySpec) (string, []interface{}, error) { + query := squirrel.Select( + "stream_id", + "avg(input_fps) as avg_input_fps", + "avg(output_fps) as avg_output_fps", + "countIf(last_error != '') as error_count", + "arrayFilter(x -> x != '', groupUniqArray(last_error)) as errors", + "sum(restart_count) as total_restarts", + "arrayFilter(x -> x != '', groupUniqArray(last_restart_logs)) as restart_logs"). + From("stream_status"). + GroupBy("stream_id"). + Limit(maxClickhouseResultRows + 1) + + if spec.Filter.StreamID != "" { + query = query.Where("stream_id = ?", spec.Filter.StreamID) + } + + if spec.From != nil { + query = query.Where("timestamp_ts > ?", spec.From) + } + + if spec.To != nil { + query = query.Where("timestamp_ts < ?", spec.To) + } + + sql, args, err := query.ToSql() + if err != nil { + return "", nil, err + } + + return sql, args, nil +} + +func replaceNaN(rows []AIStreamStatusEventRow) []AIStreamStatusEventRow { + var res []AIStreamStatusEventRow + for _, r := range rows { + if math.IsNaN(r.AvgInputFPS) { + r.AvgInputFPS = 0.0 + } + if math.IsNaN(r.AvgOutputFPS) { + r.AvgOutputFPS = 0.0 + } + res = append(res, r) + } + return res +} diff --git a/ai/client.go b/ai/client.go new file mode 100644 index 0000000..e89e60b --- /dev/null +++ b/ai/client.go @@ -0,0 +1,79 @@ +package ai + +import ( + "context" + "errors" + "fmt" + + livepeer "github.com/livepeer/go-api-client" + "github.com/livepeer/livepeer-data/pkg/data" + promClient "github.com/prometheus/client_golang/api" +) + +var ErrAssetNotFound = errors.New("asset not found") + +type StreamStatus struct { + StreamID string `json:"streamId"` + AvgInputFPS data.Nullable[float64] `json:"avgInputFps"` + AvgOutputFPS data.Nullable[float64] `json:"avgOutputFps"` + ErrorCount uint64 `json:"errorCount"` + Errors []string `json:"errors"` + TotalRestarts uint64 `json:"totalRestarts"` + RestartLogs []string `json:"restartLogs"` +} + +type ClientOptions struct { + Prometheus promClient.Config + Livepeer livepeer.ClientOptions + ClickhouseOptions +} + +type Client struct { + opts ClientOptions + lp *livepeer.Client + clickhouse Clickhouse +} + +func NewClient(opts ClientOptions) (*Client, error) { + lp := livepeer.NewAPIClient(opts.Livepeer) + + clickhouse, err := NewClickhouseConn(opts.ClickhouseOptions) + if err != nil { + return nil, fmt.Errorf("error creating clickhouse client: %w", err) + } + + return &Client{opts, lp, clickhouse}, nil +} + +func (c *Client) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]StreamStatus, error) { + rows, err := c.clickhouse.QueryAIStreamStatusEvents(ctx, spec) + if err != nil { + return nil, err + } + metrics := aiStreamStatusEventsToStreamStatuses(rows, spec) + return metrics, nil +} + +func aiStreamStatusEventsToStreamStatuses(rows []AIStreamStatusEventRow, spec QuerySpec) []StreamStatus { + streamStatuses := make([]StreamStatus, len(rows)) + for i, row := range rows { + streamStatuses[i] = StreamStatus{ + StreamID: row.StreamID, + AvgInputFPS: data.WrapNullable(row.AvgInputFPS), + AvgOutputFPS: data.WrapNullable(row.AvgOutputFPS), + ErrorCount: row.ErrorCount, + Errors: row.Errors, + TotalRestarts: row.TotalRestarts, + RestartLogs: row.RestartLogs, + } + } + return streamStatuses +} + +func toFloat64Ptr(f float64, asked bool) data.Nullable[float64] { + return data.ToNullable(f, true, asked) +} + +func toStringPtr(s string, asked bool) data.Nullable[string] { + return data.ToNullable(s, true, asked) +} diff --git a/ai/client_test.go b/ai/client_test.go new file mode 100644 index 0000000..9f1d2b3 --- /dev/null +++ b/ai/client_test.go @@ -0,0 +1,100 @@ +package ai + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" +) + +type MockClickhouseClient struct { + rows []AIStreamStatusEventRow +} + +func (m MockClickhouseClient) QueryAIStreamStatusEvents(ctx context.Context, spec QuerySpec) ([]AIStreamStatusEventRow, error) { + return m.rows, nil +} + +func TestQueryAIStreamStatusEvents(t *testing.T) { + require := require.New(t) + + tests := []struct { + name string + spec QuerySpec + rows []AIStreamStatusEventRow + expJson string + }{ + { + name: "basic query with no errors", + rows: []AIStreamStatusEventRow{ + { + StreamID: "stream-1", + AvgInputFPS: 30.0, + AvgOutputFPS: 25.0, + ErrorCount: 0, + Errors: []string{}, + TotalRestarts: 1, + RestartLogs: []string{"restart-log-1"}, + }, + }, + expJson: ` + [ + { + "streamId": "stream-1", + "avgInputFps": 30.0, + "avgOutputFps": 25.0, + "errorCount": 0, + "errors": [], + "totalRestarts": 1, + "restartLogs": ["restart-log-1"] + } + ] + `, + }, + { + name: "query with errors", + rows: []AIStreamStatusEventRow{ + { + StreamID: "stream-2", + AvgInputFPS: 20.0, + AvgOutputFPS: 15.0, + ErrorCount: 2, + Errors: []string{"error-1", "error-2"}, + TotalRestarts: 3, + RestartLogs: []string{"restart-log-2", "restart-log-3"}, + }, + }, + expJson: ` + [ + { + "streamId": "stream-2", + "avgInputFps": 20.0, + "avgOutputFps": 15.0, + "errorCount": 2, + "errors": ["error-1", "error-2"], + "totalRestarts": 3, + "restartLogs": ["restart-log-2", "restart-log-3"] + } + ] + `, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // given + mockClickhouse := MockClickhouseClient{rows: tt.rows} + client := Client{clickhouse: &mockClickhouse} + + // when + res, err := client.QueryAIStreamStatusEvents(context.Background(), tt.spec) + + // then + require.NoError(err) + jsonRes, err := json.Marshal(res) + require.NoError(err) + require.JSONEq(tt.expJson, string(jsonRes)) + }) + } +} diff --git a/ai/query_spec.go b/ai/query_spec.go new file mode 100644 index 0000000..09c18cc --- /dev/null +++ b/ai/query_spec.go @@ -0,0 +1,24 @@ +package ai + +import ( + "time" +) + +type QueryFilter struct { + StreamID string +} + +type QuerySpec struct { + From, To *time.Time + Filter QueryFilter +} + +func NewQuerySpec(streamID string, from, to *time.Time) QuerySpec { + return QuerySpec{ + From: from, + To: to, + Filter: QueryFilter{ + StreamID: streamID, + }, + } +} diff --git a/api/handler.go b/api/handler.go index d5d5d07..30390f7 100644 --- a/api/handler.go +++ b/api/handler.go @@ -11,6 +11,7 @@ import ( "github.com/go-chi/chi/v5" chimiddleware "github.com/go-chi/chi/v5/middleware" "github.com/golang/glog" + "github.com/livepeer/livepeer-data/ai" "github.com/livepeer/livepeer-data/health" "github.com/livepeer/livepeer-data/metrics" "github.com/livepeer/livepeer-data/pkg/data" @@ -64,10 +65,18 @@ type apiHandler struct { core *health.Core views *views.Client usage *usage.Client + ai *ai.Client } -func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *health.Core, views *views.Client, usage *usage.Client) http.Handler { - handler := &apiHandler{opts, serverCtx, healthcore, views, usage} +func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *health.Core, views *views.Client, usage *usage.Client, ai *ai.Client) http.Handler { + handler := &apiHandler{ + opts: opts, + serverCtx: serverCtx, + core: healthcore, + views: views, + usage: usage, + ai: ai, + } router := chi.NewRouter() @@ -85,6 +94,7 @@ func NewHandler(serverCtx context.Context, opts APIHandlerOptions, healthcore *h router.Mount(`/stream/{`+streamIDParam+`}`, handler.streamHealthHandler()) router.Mount("/views", handler.viewershipHandler()) router.Mount("/usage", handler.usageHandler()) + router.Mount("/ai-stream-status/", handler.aiStreamStatusHandler()) }) return router @@ -186,6 +196,15 @@ func (h *apiHandler) usageHandler() chi.Router { return router } +func (h *apiHandler) aiStreamStatusHandler() chi.Router { + router := chi.NewRouter() + + h.withMetrics(router, "query_ai_stream_status"). + MethodFunc("GET", "/", h.queryAIStreamStatusEvents()) + + return router +} + func (h *apiHandler) withMetrics(router chi.Router, name string) chi.Router { if !h.opts.Prometheus { return router @@ -424,6 +443,23 @@ func (h *apiHandler) queryTimeSeriesRealtimeViewership() http.HandlerFunc { } } +func (h *apiHandler) queryAIStreamStatusEvents() http.HandlerFunc { + return func(rw http.ResponseWriter, r *http.Request) { + + qs := r.URL.Query() + querySpec := ai.NewQuerySpec( + qs.Get("streamId"), nil, nil, + ) + + streamStatus, err := h.ai.QueryAIStreamStatusEvents(r.Context(), querySpec) + if err != nil { + respondError(rw, http.StatusInternalServerError, err) + return + } + respondJson(rw, http.StatusOK, streamStatus) + } +} + func (h *apiHandler) resolveViewershipQuerySpec(r *http.Request) (views.QuerySpec, int, []error) { var ( from, err1 = parseInputTimestamp(r.URL.Query().Get("from")) diff --git a/api/server.go b/api/server.go index 81cb054..89123d3 100644 --- a/api/server.go +++ b/api/server.go @@ -8,6 +8,7 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/livepeer-data/ai" "github.com/livepeer/livepeer-data/health" "github.com/livepeer/livepeer-data/usage" "github.com/livepeer/livepeer-data/views" @@ -21,10 +22,10 @@ type ServerOptions struct { APIHandlerOptions } -func ListenAndServe(ctx context.Context, opts ServerOptions, healthcore *health.Core, views *views.Client, usage *usage.Client) error { +func ListenAndServe(ctx context.Context, opts ServerOptions, healthcore *health.Core, views *views.Client, usage *usage.Client, ai *ai.Client) error { srv := &http.Server{ Addr: fmt.Sprintf("%s:%d", opts.Host, opts.Port), - Handler: NewHandler(ctx, opts.APIHandlerOptions, healthcore, views, usage), + Handler: NewHandler(ctx, opts.APIHandlerOptions, healthcore, views, usage, ai), } eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { diff --git a/cmd/analyzer/analyzer.go b/cmd/analyzer/analyzer.go index 188ec20..12b1a0b 100644 --- a/cmd/analyzer/analyzer.go +++ b/cmd/analyzer/analyzer.go @@ -11,6 +11,7 @@ import ( "time" "github.com/golang/glog" + "github.com/livepeer/livepeer-data/ai" "github.com/livepeer/livepeer-data/api" "github.com/livepeer/livepeer-data/health" "github.com/livepeer/livepeer-data/health/reducers" @@ -49,6 +50,7 @@ type cliFlags struct { viewsOpts views.ClientOptions usageOpts usage.ClientOptions + aiOpts ai.ClientOptions } func parseFlags(version string) cliFlags { @@ -152,10 +154,10 @@ func Run(build BuildFlags) { healthcore := provisionStreamHealthcore(ctx, cli) defer healthcore.Close() - views, usage := provisionDataAnalytics(cli) + views, usage, ai := provisionDataAnalytics(cli) glog.Info("Starting server...") - err := api.ListenAndServe(ctx, cli.serverOpts, healthcore, views, usage) + err := api.ListenAndServe(ctx, cli.serverOpts, healthcore, views, usage, ai) if err != nil { glog.Fatalf("Error starting api server. err=%q", err) } @@ -190,9 +192,9 @@ func provisionStreamHealthcore(ctx context.Context, cli cliFlags) *health.Core { return healthcore } -func provisionDataAnalytics(cli cliFlags) (*views.Client, *usage.Client) { +func provisionDataAnalytics(cli cliFlags) (*views.Client, *usage.Client, *ai.Client) { if cli.disableBigQuery { - return nil, nil + return nil, nil, nil } views, err := views.NewClient(cli.viewsOpts) if err != nil { @@ -204,7 +206,12 @@ func provisionDataAnalytics(cli cliFlags) (*views.Client, *usage.Client) { glog.Fatalf("Error creating usage client. err=%q", err) } - return views, usage + ai, err := ai.NewClient(cli.aiOpts) + if err != nil { + glog.Fatalf("Error creating ai client. err=%q", err) + } + + return views, usage, ai } func contextUntilSignal(parent context.Context, sigs ...os.Signal) context.Context {