From b39dc44e5aaf588ca82d55c051b7c1326838e1bd Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 4 Dec 2023 15:19:24 +0100 Subject: [PATCH 1/2] task: added delete task handler --- task/delete.go | 93 ++++++++++++++++++++++++++++++++++++++++++++++++++ task/runner.go | 2 ++ 2 files changed, 95 insertions(+) create mode 100644 task/delete.go diff --git a/task/delete.go b/task/delete.go new file mode 100644 index 00000000..98ddac8e --- /dev/null +++ b/task/delete.go @@ -0,0 +1,93 @@ +package task + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "github.com/livepeer/go-api-client" + "github.com/livepeer/livepeer-data/pkg/data" + "golang.org/x/sync/errgroup" +) + +func TaskDelete(tctx *TaskContext) (*TaskHandlerOutput, error) { + + var ( + ctx = tctx.Context + asset = tctx.InputAsset + osSess = tctx.outputOS + ) + + directory := asset.PlaybackID + + files, err := osSess.ListFiles(ctx, directory, "/") + + if err != nil { + glog.Errorf("Error listing files in directory %v", directory) + } + + totalFiles := files.Files() + + accumulator := NewAccumulator() + tctx.Progress.TrackCount(accumulator.Size, uint64(len(totalFiles)), 0) + + const maxRetries = 3 + const retryInterval = time.Second + + eg := errgroup.Group{} + + for _, file := range totalFiles { + filename := file.Name + eg.Go(func() error { + var err error + for i := 0; i < maxRetries; i++ { + err = osSess.DeleteFile(ctx, filename) + if err == nil { + accumulator.Accumulate(1) + return nil + } + glog.Errorf("Error deleting file %v: %v (retrying...)", filename, err) + time.Sleep(retryInterval) + } + return fmt.Errorf("failed to delete file %v after %d retries", filename, maxRetries) + }) + } + + if err := eg.Wait(); err != nil { + glog.Errorf("Error deleting files: %v", err) + } + + if ipfs := asset.AssetSpec.Storage.IPFS; ipfs != nil { + err = UnpinFromIpfs(*tctx, ipfs.CID, "cid") + if err != nil { + glog.Errorf("Error unpinning from IPFS %v", ipfs.CID) + } + err = UnpinFromIpfs(*tctx, ipfs.NFTMetadata.CID, "nftMetadataCid") + if err != nil { + glog.Errorf("Error unpinning metadata from IPFS %v", ipfs.NFTMetadata.CID) + } + } + + return &TaskHandlerOutput{ + TaskOutput: &data.TaskOutput{}, + }, nil +} + +func UnpinFromIpfs(tctx TaskContext, cid string, filter string) error { + assets, _, err := tctx.lapi.ListAssets(api.ListOptions{ + Filters: map[string]interface{}{ + filter: cid, + }, + AllUsers: true, + }) + + if err != nil { + return err + } + + if len(assets) == 1 { + return tctx.ipfs.Unpin(tctx.Context, cid) + } + + return nil +} diff --git a/task/runner.go b/task/runner.go index 5b3227f6..6249e212 100644 --- a/task/runner.go +++ b/task/runner.go @@ -46,6 +46,8 @@ var ( "upload": TaskUpload, "export": TaskExport, "export-data": TaskExportData, + "delete": TaskDelete, + "transcode": TaskTranscode, "transcode-file": TaskTranscodeFile, "clip": TaskClip, } From 56b20732d8c23539676f0f73587383c8fe22fb39 Mon Sep 17 00:00:00 2001 From: gioelecerati Date: Mon, 4 Dec 2023 15:20:14 +0100 Subject: [PATCH 2/2] rebase fix --- task/runner.go | 1 - 1 file changed, 1 deletion(-) diff --git a/task/runner.go b/task/runner.go index 6249e212..c924ef99 100644 --- a/task/runner.go +++ b/task/runner.go @@ -47,7 +47,6 @@ var ( "export": TaskExport, "export-data": TaskExportData, "delete": TaskDelete, - "transcode": TaskTranscode, "transcode-file": TaskTranscodeFile, "clip": TaskClip, }