From 079fff0090a123ef6c8d969814d4523cd8337d3d Mon Sep 17 00:00:00 2001 From: Scott Blum Date: Fri, 9 Feb 2024 23:40:06 -0500 Subject: [PATCH 1/2] file transfer problem --- transfer/go.mod | 20 +++ transfer/go.sum | 54 +++++++++ transfer/main/transfer/aws.go | 77 ++++++++++++ transfer/main/transfer/dropbox.go | 195 ++++++++++++++++++++++++++++++ transfer/main/transfer/main.go | 142 ++++++++++++++++++++++ 5 files changed, 488 insertions(+) create mode 100644 transfer/go.mod create mode 100644 transfer/go.sum create mode 100644 transfer/main/transfer/aws.go create mode 100644 transfer/main/transfer/dropbox.go create mode 100644 transfer/main/transfer/main.go diff --git a/transfer/go.mod b/transfer/go.mod new file mode 100644 index 0000000..b7ee785 --- /dev/null +++ b/transfer/go.mod @@ -0,0 +1,20 @@ +module github.com/dragonsinth/learn/transfer + +go 1.21 + +require ( + github.com/aws/aws-sdk-go v1.50.15 + github.com/hashicorp/go-retryablehttp v0.7.5 + go.uber.org/atomic v1.11.0 + golang.org/x/oauth2 v0.17.0 + golang.org/x/sync v0.6.0 +) + +require ( + github.com/golang/protobuf v1.5.3 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect + golang.org/x/net v0.21.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/transfer/go.sum b/transfer/go.sum new file mode 100644 index 0000000..c3ce7c6 --- /dev/null +++ b/transfer/go.sum @@ -0,0 +1,54 @@ +github.com/aws/aws-sdk-go v1.50.15 h1:wEMnPfEQQFaoIJwuO18zq/vtG4Ft7NxQ3r9xlEi/8zg= +github.com/aws/aws-sdk-go v1.50.15/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= +github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M= +github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= +golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/transfer/main/transfer/aws.go b/transfer/main/transfer/aws.go new file mode 100644 index 0000000..4f15254 --- /dev/null +++ b/transfer/main/transfer/aws.go @@ -0,0 +1,77 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "log" +) + +const ( + awsBucket = "atolio-external-lab" + awsPrefix = "20240209-sblum/" + awsRegion = "us-west-2" +) + +func NewAwsClient() (*AwsClient, error) { + // Create a session using your credentials and region. + awsSess, err := session.NewSession(&aws.Config{ + Region: aws.String(awsRegion), // replace with your preferred region + MaxRetries: aws.Int(5), + }) + if err != nil { + return nil, fmt.Errorf("failed to create session: %w", err) + } + return &AwsClient{session: awsSess, uploader: s3manager.NewUploader(awsSess)}, nil +} + +type AwsClient struct { + session *session.Session + uploader *s3manager.Uploader +} + +func (ac *AwsClient) Upload(ctx context.Context, path string, data []byte) error { + _, err := ac.uploader.UploadWithContext(ctx, &s3manager.UploadInput{ + Bucket: aws.String(awsBucket), + Key: aws.String(awsPrefix + path), + Body: bytes.NewReader(data), + }) + + if err != nil { + return fmt.Errorf("failed to upload file: %w", err) + } + + return nil +} + +func (ac *AwsClient) CleanReset(ctx context.Context) error { + s3Svc := s3.New(ac.session) + + resp, err := s3Svc.ListObjectsV2(&s3.ListObjectsV2Input{ + Bucket: aws.String(awsBucket), + Prefix: aws.String(awsPrefix), + }) + + if err != nil { + return fmt.Errorf("failed to list objects: %w", err) + } + + // Loop through the objects and delete them one by one. + for _, item := range resp.Contents { + _, err = s3Svc.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(awsBucket), + Key: item.Key, + }) + + if err != nil { + return fmt.Errorf("failed to delete object: %w", err) + } + + log.Printf("Successfully deleted object: %s\n", *item.Key) + } + return nil +} diff --git a/transfer/main/transfer/dropbox.go b/transfer/main/transfer/dropbox.go new file mode 100644 index 0000000..81117ad --- /dev/null +++ b/transfer/main/transfer/dropbox.go @@ -0,0 +1,195 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "github.com/hashicorp/go-retryablehttp" + "golang.org/x/oauth2" + "io" + "net/http" + "os" +) + +const ( + maxDbBatch = 100 // ensure we test cursoring +) + +func NewDropBoxClient(ctx context.Context) (*DropBoxClient, error) { + conf := &oauth2.Config{ + ClientID: os.Getenv("DROPBOX_CLIENT_ID"), + ClientSecret: os.Getenv("DROPBOX_CLIENT_SECRET"), + Endpoint: oauth2.Endpoint{ + TokenURL: "https://api.dropboxapi.com/oauth2/token", + }, + } + + tok := &oauth2.Token{ + RefreshToken: os.Getenv("DROPBOX_REFRESH_TOKEN"), + } + + // make sure it works + ts := conf.TokenSource(ctx, tok) + if _, err := ts.Token(); err != nil { + return nil, fmt.Errorf("failed to create token: %w", err) + } + + oauthCl := oauth2.NewClient(nil, ts) + retryCl := retryablehttp.NewClient() + retryCl.HTTPClient = oauthCl + retryCl.Logger = nil + return &DropBoxClient{cl: retryCl}, nil +} + +type DropBoxClient struct { + cl *retryablehttp.Client +} + +type File struct { + Tag string `json:".tag"` + Name string `json:"name"` + PathLower string `json:"path_lower"` + PathDisplay string `json:"path_display"` + ID string `json:"id"` + ClientModified string `json:"client_modified"` + ServerModified string `json:"server_modified"` + Rev string `json:"rev"` + Size int `json:"size"` + IsDownloadable bool `json:"is_downloadable"` + ContentHash string `json:"content_hash"` +} + +func (db *DropBoxClient) IterateFiles(ctx context.Context, limit int, cb func(context.Context, File) error) error { + type ListRequest struct { + Path string `json:"path"` + Recursive bool `json:"recursive"` + Limit int `json:"limit"` + IncludeNonDownloadableFiles bool `json:"include_non_downloadable_files"` + } + + type ListResponse struct { + Entries []File `json:"entries"` + Cursor string `json:"cursor"` + HasMore bool `json:"has_more"` + } + + type ContinueRequest struct { + Cursor string `json:"cursor"` + } + + req := &ListRequest{ + Path: "", + Recursive: true, + Limit: min(limit, maxDbBatch), + IncludeNonDownloadableFiles: false, + } + var rsp ListResponse + err := db.post(ctx, "/files/list_folder", req, &rsp) + if err != nil { + return fmt.Errorf("dropbox list files: %w", err) + } + + for { + for _, e := range rsp.Entries { + if err := cb(ctx, e); err != nil { + return fmt.Errorf("callback error: %w", err) + } + limit-- + if limit == 0 { + return nil + } + } + + if !rsp.HasMore { + return nil + } + + req := &ContinueRequest{Cursor: rsp.Cursor} + rsp = ListResponse{} + err := db.post(ctx, "/files/list_folder/continue", req, &rsp) + if err != nil { + return fmt.Errorf("dropbox iterate files: %w", err) + } + } +} + +func (db *DropBoxClient) post(ctx context.Context, path string, req any, rsp any) error { + reqBytes, err := json.Marshal(req) + if err != nil { + panic(err) // should never happen + } + httpReq, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, "https://api.dropboxapi.com/2"+path, reqBytes) + if err != nil { + panic(err) // should never happen + } + httpReq.Header.Set("Content-Type", "application/json") + + httpRsp, err := db.cl.Do(httpReq) + if err != nil { + return fmt.Errorf("dropbox http request error: %w", err) + } + body, err := io.ReadAll(httpRsp.Body) + if err != nil { + return fmt.Errorf("dropbox http body read error: %w", err) + } + if httpRsp.StatusCode != http.StatusOK { + return fmt.Errorf("dropbox http bad status code: %d\n%s", httpRsp.StatusCode, string(body)) + } + + if rsp != nil { + if err := json.Unmarshal(body, rsp); err != nil { + return fmt.Errorf("dropbox could not unmarshal: %w\n%s", err, string(body)) + } + } + + return nil +} + +type DownloadRequest struct { + Path string `json:"path"` +} + +func (db *DropBoxClient) Download(ctx context.Context, path string) ([]byte, error) { + req := &DownloadRequest{ + Path: path, + } + buf, err := db.content(ctx, "/files/download", req, nil) + if err != nil { + return nil, fmt.Errorf("dropbox list files: %w", err) + } + return buf, nil +} + +func (db *DropBoxClient) content(ctx context.Context, path string, req any, rsp any) ([]byte, error) { + reqBytes, err := json.Marshal(req) + if err != nil { + panic(err) // should never happen + } + httpReq, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, "https://content.dropboxapi.com/2"+path, nil) + if err != nil { + panic(err) // should never happen + } + httpReq.Header.Set("Dropbox-API-Arg", string(reqBytes)) + + httpRsp, err := db.cl.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("dropbox http request error: %w", err) + } + body, err := io.ReadAll(httpRsp.Body) + if err != nil { + return nil, fmt.Errorf("dropbox http body read error: %w", err) + } + + if httpRsp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("dropbox http bad status code: %d\n%s", httpRsp.StatusCode, string(body)) + } + + if rsp != nil { + rspJson := httpRsp.Header.Get("Dropbox-API-Result") + if err := json.Unmarshal([]byte(rspJson), rsp); err != nil { + return nil, fmt.Errorf("dropbox could not unmarshal: %w\n%s", err, rspJson) + } + } + + return body, nil +} diff --git a/transfer/main/transfer/main.go b/transfer/main/transfer/main.go new file mode 100644 index 0000000..6e44b9c --- /dev/null +++ b/transfer/main/transfer/main.go @@ -0,0 +1,142 @@ +package main + +import ( + "context" + "fmt" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + "log" + "math" + "path/filepath" + "strings" +) + +const ( + nDropBoxWorkers = 5 + nAwsWorkers = 5 +) + +func main() { + if err := run(context.Background()); err != nil { + log.Fatal(err) + } +} + +func run(ctx context.Context) error { + dropboxClient, err := NewDropBoxClient(ctx) + if err != nil { + return fmt.Errorf("failed to create DropBox client: %w", err) + } + + awsClient, err := NewAwsClient() + if err != nil { + return fmt.Errorf("failed to create AWS client: %w", err) + } + + if false { + if err := awsClient.CleanReset(ctx); err != nil { + return fmt.Errorf("failed to reset bucket: %w", err) + } + } + + g, ctx := errgroup.WithContext(ctx) + downloadCh := make(chan File, nDropBoxWorkers) + type Upload struct { + Path string + Data []byte + } + uploadCh := make(chan Upload, nAwsWorkers) + + // producers + g.Go(func() error { + defer close(downloadCh) + err := dropboxClient.IterateFiles(ctx, math.MaxUint32, func(ctx context.Context, file File) error { + if file.Tag == "file" { + select { + case <-ctx.Done(): + return ctx.Err() + case downloadCh <- file: + log.Println("iterate:", file.ID, file.Size, file.PathDisplay) + } + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to iterate: %w", err) + } + return nil + }) + + // download from dropbox + dbWorkers := atomic.NewInt32(nDropBoxWorkers) + for i := 0; i < nDropBoxWorkers; i++ { + g.Go(func() error { + // last one out turns off the lights + defer func() { + if dbWorkers.Dec() == 0 { + close(uploadCh) + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case work, ok := <-downloadCh: + if !ok { + return nil // all done + } + + path := work.PathDisplay + basePath := filepath.Base(path) + + data, err := dropboxClient.Download(ctx, path) + if err != nil { + return fmt.Errorf("failed to download: %w", err) + } + text := string(data) + count := len(strings.Fields(text)) + if count >= 2000 && count <= 5000 { + select { + case <-ctx.Done(): + return ctx.Err() + case uploadCh <- Upload{Path: basePath, Data: data}: + log.Println("downloaded:", count, basePath) + } + } else { + log.Println("skip:", count, basePath) + } + } + } + + }) + } + + // upload to AWS + uploadCount := atomic.NewInt32(0) + for i := 0; i < nAwsWorkers; i++ { + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case work, ok := <-uploadCh: + if !ok { + return nil // all done + } + if err := awsClient.Upload(ctx, work.Path, work.Data); err != nil { + return fmt.Errorf("failed to upload: %w", err) + } + log.Println("uploaded:", work.Path) + uploadCount.Inc() + } + } + }) + } + + if err := g.Wait(); err != nil { + return err + } + log.Println(uploadCount.Load(), "uploaded") // 263 + return nil +} From c098d18ef2a75cb27d523944398affd2f4767b1f Mon Sep 17 00:00:00 2001 From: Scott Blum Date: Wed, 22 May 2024 11:22:00 -0400 Subject: [PATCH 2/2] bytes --- transfer/main/transfer/main.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/transfer/main/transfer/main.go b/transfer/main/transfer/main.go index 6e44b9c..bae406c 100644 --- a/transfer/main/transfer/main.go +++ b/transfer/main/transfer/main.go @@ -1,14 +1,16 @@ package main import ( + "bytes" "context" "fmt" "go.uber.org/atomic" "golang.org/x/sync/errgroup" "log" "math" + "os" + "os/signal" "path/filepath" - "strings" ) const ( @@ -17,7 +19,9 @@ const ( ) func main() { - if err := run(context.Background()); err != nil { + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + if err := run(ctx); err != nil { log.Fatal(err) } } @@ -94,8 +98,7 @@ func run(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to download: %w", err) } - text := string(data) - count := len(strings.Fields(text)) + count := len(bytes.Fields(data)) if count >= 2000 && count <= 5000 { select { case <-ctx.Done():