diff --git a/api/authorization.go b/api/authorization.go index 6cc94b50..d8317cd6 100644 --- a/api/authorization.go +++ b/api/authorization.go @@ -38,7 +38,7 @@ var ( } ) -func authorization(authUrl string) middleware { +func authorization(authUrl string, isStream bool) middleware { return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) { ctx, cancel := context.WithTimeout(r.Context(), authTimeout) defer cancel() @@ -49,11 +49,13 @@ func authorization(authUrl string) middleware { return } authReq.Header.Set("X-Original-Uri", originalReqUri(r)) - if streamID := apiParam(r, streamIDParam); streamID != "" { - authReq.Header.Set("X-Livepeer-Stream-Id", streamID) - } else if assetID := apiParam(r, assetIDParam); assetID != "" { - authReq.Header.Set("X-Livepeer-Asset-Id", assetID) + + if isStream { + authReq.Header.Set("X-Livepeer-Stream-Id", apiParam(r, contentIDParam)) + } else { + authReq.Header.Set("X-Livepeer-Asset-Id", apiParam(r, contentIDParam)) } + copyHeaders(authorizationHeaders, r.Header, authReq.Header) authRes, err := httpClient.Do(authReq) if err != nil { diff --git a/api/handler.go b/api/handler.go index d02bc383..36b34e9e 100644 --- a/api/handler.go +++ b/api/handler.go @@ -22,8 +22,7 @@ const ( ssePingDelay = 20 * time.Second sseBufferSize = 128 - streamIDParam = "streamId" - assetIDParam = "assetId" + contentIDParam = "contentId" ) type APIHandlerOptions struct { @@ -61,10 +60,10 @@ func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) { regionProxy(opts.RegionalHostFormat, opts.OwnRegion), } if opts.AuthURL != "" { - middlewares = append(middlewares, authorization(opts.AuthURL)) + middlewares = append(middlewares, authorization(opts.AuthURL, true)) } addApiHandler := func(apiPath, name string, handler http.HandlerFunc) { - fullPath := path.Join(opts.APIRoot, "/stream/:"+streamIDParam, apiPath) + fullPath := path.Join(opts.APIRoot, "/stream/:"+contentIDParam, apiPath) fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...) router.Handler("GET", fullPath, fullHandler) } @@ -74,16 +73,17 @@ func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) { func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) { opts := handler.opts - middlewares := []middleware{} - if opts.AuthURL != "" { - middlewares = append(middlewares, authorization(opts.AuthURL)) - } - addApiHandler := func(apiPath, name string, handler http.HandlerFunc) { - fullPath := path.Join(opts.APIRoot, "/views/:"+assetIDParam, apiPath) + addApiHandler := func(apiPath, name string, isStream bool, handler http.HandlerFunc) { + middlewares := []middleware{} + if opts.AuthURL != "" { + middlewares = append(middlewares, authorization(opts.AuthURL, isStream)) + } + fullPath := path.Join(opts.APIRoot, "/views/:"+contentIDParam, apiPath) fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...) router.Handler("GET", fullPath, fullHandler) } - addApiHandler("/total", "get_total_views", handler.getTotalViews) + addApiHandler("/total", "get_total_views", false, handler.getTotalViews) + addApiHandler("/concurrent", "get_realtime_concurrent_views", true, handler.getRealTimeViews) } func (h *apiHandler) cors() middleware { @@ -110,7 +110,16 @@ func (h *apiHandler) healthcheck(rw http.ResponseWriter, r *http.Request) { } func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) { - views, err := h.views.GetTotalViews(r.Context(), apiParam(r, assetIDParam)) + views, err := h.views.GetTotalViews(r.Context(), apiParam(r, contentIDParam)) + if err != nil { + respondError(rw, http.StatusInternalServerError, err) + return + } + respondJson(rw, http.StatusOK, views) +} + +func (h *apiHandler) getRealTimeViews(rw http.ResponseWriter, r *http.Request) { + views, err := h.views.GetRealTimeViews(r.Context(), apiParam(r, contentIDParam)) if err != nil { respondError(rw, http.StatusInternalServerError, err) return diff --git a/api/streamStatus.go b/api/streamStatus.go index 2b7dc402..c0364e2a 100644 --- a/api/streamStatus.go +++ b/api/streamStatus.go @@ -16,7 +16,7 @@ const ( func streamStatus(healthcore *health.Core) middleware { return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) { - streamID := apiParam(r, streamIDParam) + streamID := apiParam(r, contentIDParam) if streamID == "" { next.ServeHTTP(rw, r) return diff --git a/views/client.go b/views/client.go index 20dccb00..6c07e9e3 100644 --- a/views/client.go +++ b/views/client.go @@ -14,12 +14,18 @@ import ( ) var ErrAssetNotFound = errors.New("asset not found") +var ErrStreamNotFound = errors.New("stream not found") type TotalViews struct { ID string `json:"id"` StartViews int64 `json:"startViews"` } +type RealTimeViews struct { + ID string `json:"id"` + Views int64 `json:"views"` +} + type ClientOptions struct { Prometheus promClient.Config Livepeer livepeer.ClientOptions @@ -59,6 +65,25 @@ func (c *Client) GetTotalViews(ctx context.Context, id string) ([]TotalViews, er }}, nil } +func (c *Client) GetRealTimeViews(ctx context.Context, id string) ([]RealTimeViews, error) { + stream, err := c.lp.GetStream(id, false) + if errors.Is(err, livepeer.ErrNotExists) { + return nil, ErrStreamNotFound + } else if err != nil { + return nil, fmt.Errorf("error getting stream: %w", err) + } + + realTimeViews, err := c.doQueryRealTimeViews(ctx, stream) + if err != nil { + return nil, fmt.Errorf("error querying real time views: %w", err) + } + + return []RealTimeViews{{ + ID: stream.PlaybackID, + Views: realTimeViews, + }}, nil +} + func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) { query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID) value, warn, err := c.prom.Query(ctx, query, time.Time{}) @@ -80,6 +105,27 @@ func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) ( return int64(vec[0].Value), nil } +func (c *Client) doQueryRealTimeViews(ctx context.Context, stream *livepeer.Stream) (int64, error) { + query := realTimeViewsQuery(stream.PlaybackID) + value, warn, err := c.prom.Query(ctx, query, time.Time{}) + if len(warn) > 0 { + glog.Warningf("Prometheus query warnings: %q", warn) + } + if err != nil { + return -1, fmt.Errorf("query error: %w", err) + } + if value.Type() != model.ValVector { + return -1, fmt.Errorf("unexpected value type: %s", value.Type()) + } + vec := value.(model.Vector) + if len(vec) > 1 { + return -1, fmt.Errorf("unexpected result count: %d", len(vec)) + } else if len(vec) == 0 { + return 0, nil + } + return int64(vec[0].Value), nil +} + func startViewsQuery(playbackID, playbackRecordingID string) string { queryID := playbackID if playbackRecordingID != "" { @@ -90,3 +136,10 @@ func startViewsQuery(playbackID, playbackRecordingID string) string { queryID, ) } + +func realTimeViewsQuery(playbackID string) string { + return fmt.Sprintf( + `sum(mist_sessions{catalyst="true", sessType="viewers", stream="video+%s"})`, + playbackID, + ) +}