diff --git a/clients/mediaconvert.go b/clients/mediaconvert.go index dbee1ede9..454b3b93d 100644 --- a/clients/mediaconvert.go +++ b/clients/mediaconvert.go @@ -15,12 +15,15 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/mediaconvert" + "github.com/livepeer/catalyst-api/config" xerrors "github.com/livepeer/catalyst-api/errors" "github.com/livepeer/catalyst-api/log" "github.com/livepeer/go-tools/drivers" "golang.org/x/sync/errgroup" ) +const IPFS_PREFIX = "ipfs://" + const pollDelay = 10 * time.Second const rateLimitedPollDelay = 15 * time.Second @@ -266,10 +269,29 @@ func getFile(ctx context.Context, url string) (io.ReadCloser, error) { if err == nil { return DownloadOSURL(url) } else { + if strings.HasPrefix(url, IPFS_PREFIX) { + cid := strings.TrimPrefix(url, IPFS_PREFIX) + return getFileIPFS(ctx, config.ImportIPFSGatewayURLs, cid) + } return getFileHTTP(ctx, url) } } +// TODO(yondonfu): Refactor this logic out of this file so it can be used elsewhere +func getFileIPFS(ctx context.Context, gateways []*url.URL, cid string) (io.ReadCloser, error) { + var content io.ReadCloser + var err error + for _, gateway := range gateways { + url := gateway.JoinPath(cid).String() + content, err = getFileHTTP(ctx, url) + if err == nil { + return content, nil + } + } + + return nil, fmt.Errorf("failed to get file from IPFS import gateways with last error: %w", err) +} + func getFileHTTP(ctx context.Context, url string) (io.ReadCloser, error) { req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { diff --git a/config/config.go b/config/config.go index c714a831b..3523653cc 100644 --- a/config/config.go +++ b/config/config.go @@ -2,6 +2,7 @@ package config import ( "fmt" + "net/url" "time" ) @@ -27,3 +28,5 @@ var TranscodingParallelJobs int = 2 var TranscodingParallelSleep time.Duration = 713 * time.Millisecond var DownloadOSURLRetries uint64 = 10 + +var ImportIPFSGatewayURLs []*url.URL = []*url.URL{} diff --git a/main.go b/main.go index 919533b92..8facb7c2a 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "net/url" + "strings" "time" "github.com/livepeer/catalyst-api/api" @@ -25,6 +26,7 @@ func main() { externalTranscoderUrl := flag.String("external-transcoder", "", "URL for the external transcoder to be used by the pipeline coordinator. Only 1 implementation today for AWS MediaConvert which should be in the format: mediaconvert://key-id:key-secret@endpoint-host?region=aws-region&role=iam-role&s3_aux_bucket=s3://bucket") vodPipelineStrategy := flag.String("vod-pipeline-strategy", string(pipeline.StrategyCatalystDominance), "Which strategy to use for the VOD pipeline") flag.StringVar(&config.RecordingCallback, "recording", "http://recording.livepeer.com/recording/status", "Callback URL for recording start&stop events") + URLSliceVarFlag(&config.ImportIPFSGatewayURLs, "import-ipfs-gateway-urls", "https://w3s.link/ipfs/,https://ipfs.io/ipfs/,https://cloudflare-ipfs.com/ipfs/", "Comma delimited ordered list of gateways (includes /ipfs/ suffix) to import IPFS files from") flag.Parse() if *mistJson { @@ -65,6 +67,18 @@ func parseURL(s string, dest **url.URL) error { return nil } +func parseURLs(s string, dest *[]*url.URL) error { + strs := strings.Split(s, ",") + urls := make([]*url.URL, len(strs)) + for i, str := range strs { + if err := parseURL(str, &urls[i]); err != nil { + return err + } + } + *dest = urls + return nil +} + func URLVarFlag(fs *flag.FlagSet, dest **url.URL, name, value, usage string) { if err := parseURL(value, dest); err != nil { panic(err) @@ -73,3 +87,12 @@ func URLVarFlag(fs *flag.FlagSet, dest **url.URL, name, value, usage string) { return parseURL(s, dest) }) } + +func URLSliceVarFlag(dest *[]*url.URL, name, value, usage string) { + if err := parseURLs(value, dest); err != nil { + panic(err) + } + flag.Func(name, usage, func(s string) error { + return parseURLs(s, dest) + }) +}