-
Notifications
You must be signed in to change notification settings - Fork 18
Download video to sync to local cache #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pseudoscalar
wants to merge
7
commits into
lbryio:local
Choose a base branch
from
pseudoscalar:issue/112/v1
base: local
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
b9bf2f6
Download video to sync to local cache
pseudoscalar 8ea15af
Basic stream publishing. Still needs some work.
pseudoscalar 2ba960a
Refactor to support future direction of development
pseudoscalar eb30fa4
Determine release time via YouTube API
pseudoscalar e564dc8
Add dry-run option
pseudoscalar 9f6b15e
Clean up local cache after publishing stream
pseudoscalar ce901f6
Add option to not wait for reflection. Add instructions for getting a…
pseudoscalar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,22 +1,253 @@ | ||
| package local | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "errors" | ||
| "os" | ||
| "regexp" | ||
| "strings" | ||
| "time" | ||
|
|
||
| log "github.com/sirupsen/logrus" | ||
| "github.com/spf13/cobra" | ||
| "github.com/abadojack/whatlanggo" | ||
|
|
||
| "github.com/lbryio/lbry.go/v2/extras/util" | ||
| "github.com/lbryio/ytsync/v5/namer" | ||
| "github.com/lbryio/ytsync/v5/tags_manager" | ||
| ) | ||
|
|
||
| type SyncContext struct { | ||
| DryRun bool | ||
| KeepCache bool | ||
| ReflectStreams bool | ||
| TempDir string | ||
| LbrynetAddr string | ||
| ChannelID string | ||
| PublishBid float64 | ||
| YouTubeSourceConfig *YouTubeSourceConfig | ||
| } | ||
|
|
||
| func (c *SyncContext) Validate() error { | ||
| if c.TempDir == "" { | ||
| return errors.New("No TempDir provided") | ||
| } | ||
| if c.LbrynetAddr == "" { | ||
| return errors.New("No Lbrynet address provided") | ||
| } | ||
| if c.ChannelID == "" { | ||
| return errors.New("No channel ID provided") | ||
| } | ||
| if c.PublishBid <= 0.0 { | ||
| return errors.New("Publish bid is not greater than zero") | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| type YouTubeSourceConfig struct { | ||
| YouTubeAPIKey string | ||
| } | ||
|
|
||
| var syncContext SyncContext | ||
|
|
||
| func AddCommand(rootCmd *cobra.Command) { | ||
| cmd := &cobra.Command{ | ||
| Use: "local", | ||
| Short: "run a personal ytsync", | ||
| Run: localCmd, | ||
| Args: cobra.ExactArgs(1), | ||
| } | ||
| //cmd.Flags().StringVar(&cache, "cache", "", "path to cache") | ||
| cmd.Flags().BoolVar(&syncContext.DryRun, "dry-run", false, "Display information about the stream publishing, but do not publish the stream") | ||
| cmd.Flags().BoolVar(&syncContext.KeepCache, "keep-cache", false, "Don't delete local files after publishing.") | ||
| cmd.Flags().BoolVar(&syncContext.ReflectStreams, "reflect-streams", true, "Require published streams to be reflected.") | ||
| cmd.Flags().StringVar(&syncContext.TempDir, "temp-dir", getEnvDefault("TEMP_DIR", ""), "directory to use for temporary files") | ||
| cmd.Flags().Float64Var(&syncContext.PublishBid, "publish-bid", 0.01, "Bid amount for the stream claim") | ||
| cmd.Flags().StringVar(&syncContext.LbrynetAddr, "lbrynet-address", getEnvDefault("LBRYNET_ADDRESS", ""), "JSONRPC address of the local LBRYNet daemon") | ||
| cmd.Flags().StringVar(&syncContext.ChannelID, "channel-id", "", "LBRY channel ID to publish to") | ||
|
|
||
| // For now, assume source is always YouTube | ||
| syncContext.YouTubeSourceConfig = &YouTubeSourceConfig{} | ||
| cmd.Flags().StringVar(&syncContext.YouTubeSourceConfig.YouTubeAPIKey, "youtube-api-key", getEnvDefault("YOUTUBE_API_KEY", ""), "YouTube API Key") | ||
| rootCmd.AddCommand(cmd) | ||
| } | ||
|
|
||
| func getEnvDefault(key, defaultValue string) string { | ||
| if value, ok := os.LookupEnv(key); ok { | ||
| return value | ||
| } | ||
| return defaultValue | ||
| } | ||
|
|
||
| func localCmd(cmd *cobra.Command, args []string) { | ||
| fmt.Println("local") | ||
| err := syncContext.Validate() | ||
| if err != nil { | ||
| log.Error(err) | ||
| return | ||
| } | ||
| videoID := args[0] | ||
|
|
||
| log.Debugf("Running sync for video ID %s", videoID) | ||
|
|
||
| var publisher VideoPublisher | ||
| publisher, err = NewLocalSDKPublisher(syncContext.LbrynetAddr, syncContext.ChannelID, syncContext.PublishBid) | ||
| if err != nil { | ||
| log.Errorf("Error setting up publisher: %v", err) | ||
| return | ||
| } | ||
|
|
||
| var videoSource VideoSource | ||
| if syncContext.YouTubeSourceConfig != nil { | ||
| videoSource, err = NewYtdlVideoSource(syncContext.TempDir, syncContext.YouTubeSourceConfig) | ||
| if err != nil { | ||
| log.Errorf("Error setting up video source: %v", err) | ||
| return | ||
| } | ||
| } | ||
|
|
||
| sourceVideo, err := videoSource.GetVideo(videoID) | ||
| if err != nil { | ||
| log.Errorf("Error getting source video: %v", err) | ||
| return | ||
| } | ||
|
|
||
| processedVideo, err := processVideoForPublishing(*sourceVideo, syncContext.ChannelID) | ||
| if err != nil { | ||
| log.Errorf("Error processing source video for publishing: %v", err) | ||
| return | ||
| } | ||
|
|
||
| if syncContext.DryRun { | ||
| log.Infoln("This is a dry run. Nothing will be published.") | ||
| log.Infof("The local file %s would be published to channel ID %s as %s.", processedVideo.FullLocalPath, syncContext.ChannelID, processedVideo.ClaimName) | ||
| log.Debugf("Object to be published: %v", processedVideo) | ||
|
|
||
| } else { | ||
| doneReflectingCh, err := publisher.Publish(*processedVideo, syncContext.ReflectStreams) | ||
| if err != nil { | ||
| log.Errorf("Error publishing video: %v", err) | ||
| return | ||
| } | ||
|
|
||
| if syncContext.ReflectStreams { | ||
| err = <-doneReflectingCh | ||
| if err != nil { | ||
| log.Errorf("Error while wating for stream to reflect: %v", err) | ||
| } | ||
| } else { | ||
| log.Debugln("Not waiting for stream to reflect.") | ||
| } | ||
| } | ||
|
|
||
| if !syncContext.KeepCache { | ||
| log.Infof("Deleting local files.") | ||
| err = videoSource.DeleteLocalCache(videoID) | ||
| if err != nil { | ||
| log.Errorf("Error deleting local files for video %s: %v", videoID, err) | ||
| } | ||
| } | ||
| log.Info("Done") | ||
| } | ||
|
|
||
| type SourceVideo struct { | ||
| ID string | ||
| Title *string | ||
| Description *string | ||
| SourceURL string | ||
| Languages []string | ||
| Tags []string | ||
| ReleaseTime *int64 | ||
| ThumbnailURL *string | ||
| FullLocalPath string | ||
| } | ||
|
|
||
| type PublishableVideo struct { | ||
| ID string | ||
| ClaimName string | ||
| Title string | ||
| Description string | ||
| SourceURL string | ||
| Languages []string | ||
| Tags []string | ||
| ReleaseTime int64 | ||
| ThumbnailURL string | ||
| FullLocalPath string | ||
| } | ||
|
|
||
| func processVideoForPublishing(source SourceVideo, channelID string) (*PublishableVideo, error) { | ||
| tags, err := tags_manager.SanitizeTags(source.Tags, channelID) | ||
| if err != nil { | ||
| log.Errorf("Error sanitizing tags: %v", err) | ||
| return nil, err | ||
| } | ||
|
|
||
| descriptionSample := "" | ||
| if source.Description != nil { | ||
| urlsRegex := regexp.MustCompile(`(?m) ?(f|ht)(tp)(s?)(://)(.*)[.|/](.*)`) | ||
| descriptionSample = urlsRegex.ReplaceAllString(*source.Description, "") | ||
| } | ||
| info := whatlanggo.Detect(descriptionSample) | ||
|
|
||
| title := "" | ||
| if source.Title != nil { | ||
| title = *source.Title | ||
| } | ||
| info2 := whatlanggo.Detect(title) | ||
| var languages []string = nil | ||
| if info.IsReliable() && info.Lang.Iso6391() != "" { | ||
| language := info.Lang.Iso6391() | ||
| languages = []string{language} | ||
| } else if info2.IsReliable() && info2.Lang.Iso6391() != "" { | ||
| language := info2.Lang.Iso6391() | ||
| languages = []string{language} | ||
| } | ||
|
|
||
| claimName := namer.NewNamer().GetNextName(title) | ||
|
|
||
| thumbnailURL := source.ThumbnailURL | ||
| if thumbnailURL == nil { | ||
| thumbnailURL = util.PtrToString("") | ||
| } | ||
|
|
||
| releaseTime := source.ReleaseTime | ||
| if releaseTime == nil { | ||
| releaseTime = util.PtrToInt64(time.Now().Unix()) | ||
| } | ||
|
|
||
| processed := PublishableVideo { | ||
| ClaimName: claimName, | ||
| Title: title, | ||
| Description: getAbbrevDescription(source), | ||
| Languages: languages, | ||
| Tags: tags, | ||
| ReleaseTime: *releaseTime, | ||
| ThumbnailURL: *thumbnailURL, | ||
| FullLocalPath: source.FullLocalPath, | ||
| } | ||
|
|
||
| log.Debugf("Video prepared for publication: %v", processed) | ||
|
|
||
| return &processed, nil | ||
| } | ||
|
|
||
| func getAbbrevDescription(v SourceVideo) string { | ||
| if v.Description == nil { | ||
| return v.SourceURL | ||
| } | ||
|
|
||
| additionalDescription := "\n...\n" + v.SourceURL | ||
| maxLength := 2800 - len(additionalDescription) | ||
|
|
||
| description := strings.TrimSpace(*v.Description) | ||
| if len(description) > maxLength { | ||
| description = description[:maxLength] | ||
| } | ||
| return description + additionalDescription | ||
| } | ||
|
|
||
| type VideoSource interface { | ||
| GetVideo(id string) (*SourceVideo, error) | ||
| DeleteLocalCache(id string) error | ||
| } | ||
|
|
||
| type VideoPublisher interface { | ||
| Publish(video PublishableVideo, reflectStream bool) (chan error, error) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| package local | ||
|
|
||
| import ( | ||
| "errors" | ||
| "sort" | ||
| "time" | ||
|
|
||
| log "github.com/sirupsen/logrus" | ||
|
|
||
| "github.com/lbryio/lbry.go/v2/extras/jsonrpc" | ||
| "github.com/lbryio/lbry.go/v2/extras/util" | ||
| ) | ||
|
|
||
| type LocalSDKPublisher struct { | ||
| channelID string | ||
| publishBid float64 | ||
| lbrynet *jsonrpc.Client | ||
| } | ||
|
|
||
| func NewLocalSDKPublisher(sdkAddr, channelID string, publishBid float64) (*LocalSDKPublisher, error) { | ||
| lbrynet := jsonrpc.NewClient(sdkAddr) | ||
| lbrynet.SetRPCTimeout(5 * time.Minute) | ||
|
|
||
| status, err := lbrynet.Status() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if !status.IsRunning { | ||
| return nil, errors.New("SDK is not running") | ||
| } | ||
|
|
||
| // Should check to see if the SDK owns the channel | ||
|
|
||
| // Should check to see if wallet is unlocked | ||
| // but jsonrpc.Client doesn't have WalletStatus method | ||
| // so skip for now | ||
|
|
||
| // Should check to see if streams are configured to be reflected and warn if not | ||
| // but jsonrpc.Client doesn't have SettingsGet method to see if streams are reflected | ||
| // so use File.UploadingToReflector as a proxy for now | ||
|
|
||
| publisher := LocalSDKPublisher { | ||
| channelID: channelID, | ||
| publishBid: publishBid, | ||
| lbrynet: lbrynet, | ||
| } | ||
| return &publisher, nil | ||
| } | ||
|
|
||
| func (p *LocalSDKPublisher) Publish(video PublishableVideo, reflectStream bool) (chan error, error) { | ||
| streamCreateOptions := jsonrpc.StreamCreateOptions { | ||
| ClaimCreateOptions: jsonrpc.ClaimCreateOptions { | ||
| Title: &video.Title, | ||
| Description: &video.Description, | ||
| Languages: video.Languages, | ||
| ThumbnailURL: &video.ThumbnailURL, | ||
| Tags: video.Tags, | ||
| }, | ||
| ReleaseTime: &video.ReleaseTime, | ||
| ChannelID: &p.channelID, | ||
| License: util.PtrToString("Copyrighted (contact publisher)"), | ||
| } | ||
|
|
||
| txSummary, err := p.lbrynet.StreamCreate(video.ClaimName, video.FullLocalPath, p.publishBid, streamCreateOptions) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| if !reflectStream { | ||
| return nil, nil | ||
| } | ||
|
|
||
| done := make(chan error, 1) | ||
| go func() { | ||
| for { | ||
| fileListResponse, fileIndex, err := findFileByTxid(p.lbrynet, txSummary.Txid) | ||
| if err != nil { | ||
| log.Errorf("Error finding file by txid: %v", err) | ||
| done <- err | ||
| return | ||
| } | ||
| if fileListResponse == nil { | ||
| log.Errorf("Could not find file in list with correct txid") | ||
| done <- err | ||
| return | ||
| } | ||
|
|
||
| fileStatus := fileListResponse.Items[fileIndex] | ||
| if fileStatus.IsFullyReflected { | ||
| log.Info("Stream is fully reflected") | ||
| break | ||
| } | ||
| if !fileStatus.UploadingToReflector { | ||
| log.Error("Stream is not being uploaded to a reflector. Check your lbrynet settings if this is a mistake.") | ||
| done <- errors.New("Stream is not being reflected (check lbrynet settings).") | ||
| return | ||
| } | ||
| log.Infof("Stream reflector progress: %d%%", fileStatus.ReflectorProgress) | ||
| time.Sleep(5 * time.Second) | ||
| } | ||
| done <- nil | ||
| }() | ||
|
|
||
| return done, nil | ||
| } | ||
|
|
||
| // if jsonrpc.Client.FileList is extended to match the actual jsonrpc schema, this can be removed | ||
| func findFileByTxid(client *jsonrpc.Client, txid string) (*jsonrpc.FileListResponse, int, error) { | ||
| response, err := client.FileList(0, 20) | ||
| for { | ||
| if err != nil { | ||
| log.Errorf("Error getting file list page: %v", err) | ||
| return nil, 0, err | ||
| } | ||
| index := sort.Search(len(response.Items), func (i int) bool { return response.Items[i].Txid == txid }) | ||
| if index < len(response.Items) { | ||
| return response, index, nil | ||
| } | ||
| if response.Page >= response.TotalPages { | ||
| return nil, 0, nil | ||
| } | ||
| response, err = client.FileList(response.Page + 1, 20) | ||
| } | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.