Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions api/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
}
)

func authorization(authUrl string) middleware {
func authorization(authUrl string, isStream bool) middleware {
return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) {
ctx, cancel := context.WithTimeout(r.Context(), authTimeout)
defer cancel()
Expand All @@ -49,11 +49,13 @@ func authorization(authUrl string) middleware {
return
}
authReq.Header.Set("X-Original-Uri", originalReqUri(r))
if streamID := apiParam(r, streamIDParam); streamID != "" {
authReq.Header.Set("X-Livepeer-Stream-Id", streamID)
} else if assetID := apiParam(r, assetIDParam); assetID != "" {
authReq.Header.Set("X-Livepeer-Asset-Id", assetID)

if isStream {
authReq.Header.Set("X-Livepeer-Stream-Id", apiParam(r, contentIDParam))
} else {
authReq.Header.Set("X-Livepeer-Asset-Id", apiParam(r, contentIDParam))
}

copyHeaders(authorizationHeaders, r.Header, authReq.Header)
authRes, err := httpClient.Do(authReq)
if err != nil {
Expand Down
33 changes: 21 additions & 12 deletions api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ const (
ssePingDelay = 20 * time.Second
sseBufferSize = 128

streamIDParam = "streamId"
assetIDParam = "assetId"
contentIDParam = "contentId"
)

type APIHandlerOptions struct {
Expand Down Expand Up @@ -61,10 +60,10 @@ func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) {
regionProxy(opts.RegionalHostFormat, opts.OwnRegion),
}
if opts.AuthURL != "" {
middlewares = append(middlewares, authorization(opts.AuthURL))
middlewares = append(middlewares, authorization(opts.AuthURL, true))
}
addApiHandler := func(apiPath, name string, handler http.HandlerFunc) {
fullPath := path.Join(opts.APIRoot, "/stream/:"+streamIDParam, apiPath)
fullPath := path.Join(opts.APIRoot, "/stream/:"+contentIDParam, apiPath)
fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...)
router.Handler("GET", fullPath, fullHandler)
}
Expand All @@ -74,16 +73,17 @@ func addStreamHealthHandlers(router *httprouter.Router, handler *apiHandler) {

func addViewershipHandlers(router *httprouter.Router, handler *apiHandler) {
opts := handler.opts
middlewares := []middleware{}
if opts.AuthURL != "" {
middlewares = append(middlewares, authorization(opts.AuthURL))
}
addApiHandler := func(apiPath, name string, handler http.HandlerFunc) {
fullPath := path.Join(opts.APIRoot, "/views/:"+assetIDParam, apiPath)
addApiHandler := func(apiPath, name string, isStream bool, handler http.HandlerFunc) {
middlewares := []middleware{}
if opts.AuthURL != "" {
middlewares = append(middlewares, authorization(opts.AuthURL, isStream))
}
fullPath := path.Join(opts.APIRoot, "/views/:"+contentIDParam, apiPath)
fullHandler := prepareHandlerFunc(name, opts.Prometheus, handler, middlewares...)
router.Handler("GET", fullPath, fullHandler)
}
addApiHandler("/total", "get_total_views", handler.getTotalViews)
addApiHandler("/total", "get_total_views", false, handler.getTotalViews)
addApiHandler("/concurrent", "get_realtime_concurrent_views", true, handler.getRealTimeViews)
}

func (h *apiHandler) cors() middleware {
Expand All @@ -110,7 +110,16 @@ func (h *apiHandler) healthcheck(rw http.ResponseWriter, r *http.Request) {
}

func (h *apiHandler) getTotalViews(rw http.ResponseWriter, r *http.Request) {
views, err := h.views.GetTotalViews(r.Context(), apiParam(r, assetIDParam))
views, err := h.views.GetTotalViews(r.Context(), apiParam(r, contentIDParam))
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
}
respondJson(rw, http.StatusOK, views)
}

func (h *apiHandler) getRealTimeViews(rw http.ResponseWriter, r *http.Request) {
views, err := h.views.GetRealTimeViews(r.Context(), apiParam(r, contentIDParam))
if err != nil {
respondError(rw, http.StatusInternalServerError, err)
return
Expand Down
2 changes: 1 addition & 1 deletion api/streamStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const (

func streamStatus(healthcore *health.Core) middleware {
return inlineMiddleware(func(rw http.ResponseWriter, r *http.Request, next http.Handler) {
streamID := apiParam(r, streamIDParam)
streamID := apiParam(r, contentIDParam)
if streamID == "" {
next.ServeHTTP(rw, r)
return
Expand Down
53 changes: 53 additions & 0 deletions views/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ import (
)

var ErrAssetNotFound = errors.New("asset not found")
var ErrStreamNotFound = errors.New("stream not found")

type TotalViews struct {
ID string `json:"id"`
StartViews int64 `json:"startViews"`
}

type RealTimeViews struct {
ID string `json:"id"`
Views int64 `json:"views"`
}

type ClientOptions struct {
Prometheus promClient.Config
Livepeer livepeer.ClientOptions
Expand Down Expand Up @@ -59,6 +65,25 @@ func (c *Client) GetTotalViews(ctx context.Context, id string) ([]TotalViews, er
}}, nil
}

func (c *Client) GetRealTimeViews(ctx context.Context, id string) ([]RealTimeViews, error) {
stream, err := c.lp.GetStream(id, false)
if errors.Is(err, livepeer.ErrNotExists) {
return nil, ErrStreamNotFound
} else if err != nil {
return nil, fmt.Errorf("error getting stream: %w", err)
}

realTimeViews, err := c.doQueryRealTimeViews(ctx, stream)
if err != nil {
return nil, fmt.Errorf("error querying real time views: %w", err)
}

return []RealTimeViews{{
ID: stream.PlaybackID,
Views: realTimeViews,
}}, nil
}

func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (int64, error) {
query := startViewsQuery(asset.PlaybackID, asset.PlaybackRecordingID)
value, warn, err := c.prom.Query(ctx, query, time.Time{})
Expand All @@ -80,6 +105,27 @@ func (c *Client) doQueryStartViews(ctx context.Context, asset *livepeer.Asset) (
return int64(vec[0].Value), nil
}

func (c *Client) doQueryRealTimeViews(ctx context.Context, stream *livepeer.Stream) (int64, error) {
query := realTimeViewsQuery(stream.PlaybackID)
value, warn, err := c.prom.Query(ctx, query, time.Time{})
if len(warn) > 0 {
glog.Warningf("Prometheus query warnings: %q", warn)
}
if err != nil {
return -1, fmt.Errorf("query error: %w", err)
}
if value.Type() != model.ValVector {
return -1, fmt.Errorf("unexpected value type: %s", value.Type())
}
vec := value.(model.Vector)
if len(vec) > 1 {
return -1, fmt.Errorf("unexpected result count: %d", len(vec))
} else if len(vec) == 0 {
return 0, nil
}
return int64(vec[0].Value), nil
}

func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID := playbackID
if playbackRecordingID != "" {
Expand All @@ -90,3 +136,10 @@ func startViewsQuery(playbackID, playbackRecordingID string) string {
queryID,
)
}

func realTimeViewsQuery(playbackID string) string {
return fmt.Sprintf(
`sum(mist_sessions{catalyst="true", sessType="viewers", stream="video+%s"})`,
playbackID,
)
}