From 34c4d7a20c94a181634023c8830c62b2fb092bcc Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 21 Dec 2024 16:36:01 +0530 Subject: [PATCH 1/7] cancle repair and batch key for pause cancel upload --- wasmsdk/blobber.go | 106 ++++++++++++++++++------- wasmsdk/proxy.go | 2 + zboxcore/sdk/allocation.go | 30 +++++-- zboxcore/sdk/allocation_test.go | 14 ++-- zboxcore/sdk/multi_operation_worker.go | 13 ++- zboxcore/sdk/repairworker.go | 94 +++++++++++----------- zboxcore/sdk/upload_worker.go | 5 +- 7 files changed, 173 insertions(+), 91 deletions(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 737b0bbc1..028b72bb7 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -34,8 +34,9 @@ import ( const FileOperationInsert = "insert" var ( - downloadDirContextMap = make(map[string]context.CancelCauseFunc) - downloadDirLock = sync.Mutex{} + opCancelContextMap = make(map[string]context.CancelCauseFunc) + opCancelLock = sync.Mutex{} + ErrUnderRepair = errors.New("allocation is under repair") ) // listObjects list allocation objects from its blobbers @@ -72,27 +73,45 @@ func listObjectsFromAuthTicket(allocationID, authTicket, lookupHash string, offs } // cancelUpload cancel the upload operation of the file -// - allocationID is the allocation id -// - remotePath is the remote path of the file -func cancelUpload(allocationID, remotePath string) error { - allocationObj, err := getAllocation(allocationID) +// - batchKey is the batch key of the operation +func cancelUpload(batchKey string) error { + opCancelLock.Lock() + defer opCancelLock.Unlock() + if cancel, ok := opCancelContextMap[batchKey]; ok { + cancel(sdk.ErrCancelUpload) + } else { + return errors.New("invalid batch key") + } + return nil +} + +// pauseUpload pause the upload operation of the file +// - batchKey is the batch key of the operation +func pauseUpload(batchKey string) error { + opCancelLock.Lock() + defer opCancelLock.Unlock() + if cancel, ok := opCancelContextMap[batchKey]; ok { + cancel(sdk.ErrPauseUpload) + } else { + return errors.New("invalid batch key") + } + return nil +} + +func cancelRepair(allocationID string) error { + alloc, err := getAllocation(allocationID) if err != nil { - PrintError("Error fetching the allocation", err) return err } - return allocationObj.CancelUpload(remotePath) + return alloc.CancelRepair() } -// pauseUpload pause the upload operation of the file -// - allocationID is the allocation id -// - remotePath is the remote path of the file -func pauseUpload(allocationID, remotePath string) error { - allocationObj, err := getAllocation(allocationID) +func cancelDownload(allocationID, remotePath string) error { + alloc, err := getAllocation(allocationID) if err != nil { - PrintError("Error fetching the allocation", err) return err } - return allocationObj.PauseUpload(remotePath) + return alloc.CancelDownload(remotePath) } // createDir create a directory on blobbers @@ -599,10 +618,11 @@ type MultiDownloadOption struct { // ## Inputs // - allocationID // - jsonMultiUploadOptions: Json Array of MultiOperationOption. eg: "[{"operationType":"move","remotePath":"/README.md","destPath":"/folder1/"},{"operationType":"delete","remotePath":"/t3.txt"}]" +// - batchKey: batch key for the operation // // ## Outputs // - error -func MultiOperation(allocationID string, jsonMultiUploadOptions string) error { +func MultiOperation(allocationID, jsonMultiUploadOptions string) error { if allocationID == "" { return errors.New("AllocationID is required") } @@ -631,6 +651,9 @@ func MultiOperation(allocationID string, jsonMultiUploadOptions string) error { if err != nil { return err } + if allocationObj.IsUnderRepair() { + return ErrUnderRepair + } return allocationObj.DoMultiOperation(operations) } @@ -696,7 +719,7 @@ func setUploadMode(mode int) { // multiUpload upload multiple files in parallel // - jsonBulkUploadOptions is the json array of BulkUploadOption. Follows the BulkUploadOption struct -func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { +func multiUpload(jsonBulkUploadOptions, batchKey string) (MultiUploadResult, error) { defer func() { if r := recover(); r != nil { PrintError("Recovered in multiupload Error", r) @@ -729,6 +752,11 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { result.Success = false return result, err } + if allocationObj.IsUnderRepair() { + result.Error = ErrUnderRepair.Error() + result.Success = false + return result, ErrUnderRepair + } operationRequests := make([]sdk.OperationRequest, n) for idx, option := range options { @@ -811,14 +839,34 @@ func multiUpload(jsonBulkUploadOptions string) (MultiUploadResult, error) { } } - err = allocationObj.DoMultiOperation(operationRequests) - if err != nil { - result.Error = err.Error() + ctx, cancel := context.WithCancelCause(context.Background()) + defer cancel(nil) + opCancelLock.Lock() + opCancelContextMap[batchKey] = cancel + opCancelLock.Unlock() + defer func() { + opCancelLock.Lock() + delete(opCancelContextMap, batchKey) + opCancelLock.Unlock() + }() + errChan := make(chan error, 1) + go func() { + errChan <- allocationObj.DoMultiOperation(operationRequests, sdk.WithContext(ctx)) + }() + select { + case <-ctx.Done(): + result.Error = ctx.Err().Error() result.Success = false - return result, err + return result, ctx.Err() + case err := <-errChan: + if err != nil { + result.Error = err.Error() + result.Success = false + return result, err + } + result.Success = true + return result, nil } - result.Success = true - return result, nil } func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string, fileSize int64, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int, callbackFuncName string) (bool, error) { @@ -1223,9 +1271,9 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st go func() { errChan <- alloc.DownloadDirectory(ctx, remotePath, "", authticket, statusBar) }() - downloadDirLock.Lock() - downloadDirContextMap[remotePath] = cancel - downloadDirLock.Unlock() + opCancelLock.Lock() + opCancelContextMap[remotePath] = cancel + opCancelLock.Unlock() select { case err = <-errChan: if err != nil { @@ -1240,12 +1288,12 @@ func downloadDirectory(allocationID, remotePath, authticket, callbackFuncName st // cancelDownloadDirectory cancel the download directory operation // - remotePath : remote path of the directory func cancelDownloadDirectory(remotePath string) { - downloadDirLock.Lock() - cancel, ok := downloadDirContextMap[remotePath] + opCancelLock.Lock() + cancel, ok := opCancelContextMap[remotePath] if ok { cancel(errors.New("download directory canceled by user")) } - downloadDirLock.Unlock() + opCancelLock.Unlock() } func cancelDownloadBlocks(allocationID, remotePath string, start, end int64) error { diff --git a/wasmsdk/proxy.go b/wasmsdk/proxy.go index 9826ac461..46931f618 100644 --- a/wasmsdk/proxy.go +++ b/wasmsdk/proxy.go @@ -250,6 +250,8 @@ func main() { "downloadDirectory": downloadDirectory, "cancelDownloadDirectory": cancelDownloadDirectory, "cancelDownloadBlocks": cancelDownloadBlocks, + "cancelRepair": cancelRepair, + "cancelDownload": cancelDownload, // player "play": play, diff --git a/zboxcore/sdk/allocation.go b/zboxcore/sdk/allocation.go index 779930ebb..f07ef5339 100644 --- a/zboxcore/sdk/allocation.go +++ b/zboxcore/sdk/allocation.go @@ -2919,18 +2919,21 @@ func (a *Allocation) StartRepair(localRootPath, pathToRepair string, statusCB St return err } } - + repairCtx, repairCtxCancel := context.WithCancel(a.ctx) repairReq := &RepairRequest{ - listDir: listDir, - localRootPath: localRootPath, - statusCB: statusCB, - repairPath: pathToRepair, + listDir: listDir, + localRootPath: localRootPath, + statusCB: statusCB, + repairPath: pathToRepair, + repairCtx: repairCtx, + repairCtxCancel: repairCtxCancel, } repairReq.completedCallback = func() { a.mutex.Lock() defer a.mutex.Unlock() a.repairRequestInProgress = nil + repairCtxCancel() } go func() { @@ -2970,9 +2973,11 @@ func (a *Allocation) RepairSize(remotePath string) (RepairSize, error) { if err != nil { return RepairSize{}, err } - + repairCtx, repairCtxCancel := context.WithCancel(a.ctx) repairReq := RepairRequest{ - allocation: a, + allocation: a, + repairCtx: repairCtx, + repairCtxCancel: repairCtxCancel, } return repairReq.Size(context.Background(), dir) } @@ -2987,7 +2992,7 @@ func (a *Allocation) CancelUpload(remotePath string) error { if !ok { return errors.New("remote_path_not_found", "Invalid path. No upload in progress for the path "+remotePath) } else { - cancelFunc(fmt.Errorf("upload canceled by user")) + cancelFunc(ErrCancelUpload) } return nil } @@ -3012,13 +3017,22 @@ func (a *Allocation) PauseUpload(remotePath string) error { // CancelRepair cancels the repair operation for the allocation. // It cancels the repair operation and returns an error if no repair is in progress for the allocation. func (a *Allocation) CancelRepair() error { + a.mutex.Lock() + defer a.mutex.Unlock() if a.repairRequestInProgress != nil { a.repairRequestInProgress.isRepairCanceled = true + a.repairRequestInProgress.repairCtxCancel() return nil } return errors.New("invalid_cancel_repair_request", "No repair in progress for the allocation") } +func (a *Allocation) IsUnderRepair() bool { + a.mutex.Lock() + defer a.mutex.Unlock() + return a.repairRequestInProgress != nil +} + func (a *Allocation) GetMaxWriteReadFromBlobbers(blobbers []*BlobberAllocation) (maxW float64, maxR float64, err error) { if !a.isInitialized() { return 0, 0, notInitialized diff --git a/zboxcore/sdk/allocation_test.go b/zboxcore/sdk/allocation_test.go index a2cf5b8c5..19b04f090 100644 --- a/zboxcore/sdk/allocation_test.go +++ b/zboxcore/sdk/allocation_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/hex" "encoding/json" - "fmt" "io" "io/fs" "log" @@ -19,7 +18,6 @@ import ( "github.com/0chain/gosdk/zboxcore/mocks" - encrypt "github.com/0chain/gosdk/core/encryption" "github.com/0chain/gosdk/dev/blobber" "github.com/0chain/gosdk/dev/blobber/model" "github.com/0chain/gosdk/zboxcore/encryption" @@ -388,7 +386,8 @@ func TestAllocation_dispatchWork(t *testing.T) { }) t.Run("Test_Cover_Repair_Request", func(t *testing.T) { go a.dispatchWork(context.Background()) - a.repairChan <- &RepairRequest{listDir: &ListResult{}} + repairCtx, repairCtxCancel := context.WithCancel(context.Background()) + a.repairChan <- &RepairRequest{listDir: &ListResult{}, repairCtx: repairCtx, repairCtxCancel: repairCtxCancel} }) } @@ -1454,8 +1453,7 @@ func TestAllocation_CancelDownload(t *testing.T) { setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) { req := &DownloadRequest{} req.ctx, req.ctxCncl = context.WithCancel(context.TODO()) - hash := encrypt.Hash(fmt.Sprintf("%s:%d:%d", remotePath, 1, 0)) - a.downloadProgressMap[hash] = req + a.downloadProgressMap[remotePath] = req return nil }, }, @@ -2300,7 +2298,11 @@ func TestAllocation_CancelRepair(t *testing.T) { { name: "Test_Success", setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) { - a.repairRequestInProgress = &RepairRequest{} + ctx, cancel := context.WithCancel(context.Background()) + a.repairRequestInProgress = &RepairRequest{ + repairCtx: ctx, + repairCtxCancel: cancel, + } return nil }, }, diff --git a/zboxcore/sdk/multi_operation_worker.go b/zboxcore/sdk/multi_operation_worker.go index de797d145..4d9f67340 100644 --- a/zboxcore/sdk/multi_operation_worker.go +++ b/zboxcore/sdk/multi_operation_worker.go @@ -36,11 +36,17 @@ type MultiOperationOption func(mo *MultiOperation) func WithRepair() MultiOperationOption { return func(mo *MultiOperation) { - mo.Consensus.consensusThresh = 0 + mo.Consensus.consensusThresh = 1 mo.isRepair = true } } +func WithContext(ctx context.Context) MultiOperationOption { + return func(mo *MultiOperation) { + mo.ctx, mo.ctxCncl = context.WithCancelCause(ctx) + } +} + type Operationer interface { Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) buildChange(refs []fileref.RefEntity, uid uuid.UUID) []allocationchange.AllocationChange @@ -321,6 +327,11 @@ func (mo *MultiOperation) Process() error { l.Logger.Error("consensus not met", activeBlobbers, mo.consensusThresh) return errors.New("consensus_not_met", fmt.Sprintf("Active blobbers %d is less than consensus threshold %d", activeBlobbers, mo.consensusThresh)) } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if mo.allocationObj.StorageVersion == StorageV2 { return mo.commitV2() } diff --git a/zboxcore/sdk/repairworker.go b/zboxcore/sdk/repairworker.go index ad1195bcb..7df6337c0 100644 --- a/zboxcore/sdk/repairworker.go +++ b/zboxcore/sdk/repairworker.go @@ -25,6 +25,8 @@ type RepairRequest struct { wg *sync.WaitGroup allocation *Allocation repairPath string + repairCtx context.Context + repairCtxCancel context.CancelFunc } type RepairStatusCB struct { @@ -66,7 +68,7 @@ func (r *RepairRequest) processRepair(ctx context.Context, a *Allocation) { defer r.completedCallback() } - if r.checkForCancel(a) { + if r.checkForCancel() { return } ctx, cancel := context.WithCancel(ctx) @@ -184,7 +186,7 @@ func (r *RepairRequest) iterateDir(a *Allocation, dir *ListResult) []OperationRe } } for _, childDir := range dir.Children { - if r.checkForCancel(a) { + if r.checkForCancel() { return nil } ops = append(ops, r.iterateDir(a, childDir)...) @@ -212,7 +214,7 @@ func (r *RepairRequest) iterateDir(a *Allocation, dir *ListResult) []OperationRe func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationRequest { ops := make([]OperationRequest, 0) - if r.checkForCancel(a) { + if r.checkForCancel() { return nil } l.Logger.Info("Checking file for the path :", zap.Any("path", file.Path)) @@ -244,7 +246,7 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR localPath := r.getLocalPath(file) var op *OperationRequest if !checkFileExists(localPath) { - if r.checkForCancel(a) { + if r.checkForCancel() { return nil } pipeFile := sys.NewPipeFile() @@ -261,7 +263,7 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR op = a.RepairFile(f, file.Path, statusCB, found, ref) } ops = append(ops, *op) - if r.checkForCancel(a) { + if r.checkForCancel() { return nil } } else { @@ -286,28 +288,42 @@ func (r *RepairRequest) repairFile(a *Allocation, file *ListResult) []OperationR } func (r *RepairRequest) repairOperation(a *Allocation, ops []OperationRequest) { - err := a.DoMultiOperation(ops, WithRepair()) - if err != nil { - l.Logger.Error("repair_file_failed", zap.Error(err)) - status := r.statusCB != nil - for _, op := range ops { - if op.DownloadFile { - _ = a.CancelDownload(op.RemotePath) - } - if status { - r.statusCB.Error(a.ID, op.RemotePath, OpRepair, err) + if r.checkForCancel() { + return + } + errChan := make(chan error, 1) + go func() { + err := a.DoMultiOperation(ops, WithRepair(), WithContext(r.repairCtx)) + if err != nil { + l.Logger.Error("repair_file_failed", zap.Error(err)) + status := r.statusCB != nil + for _, op := range ops { + if op.DownloadFile { + _ = a.CancelDownload(op.RemotePath) + } + if status { + r.statusCB.Error(a.ID, op.RemotePath, OpRepair, err) + } } + } else { + r.filesRepaired += len(ops) } - } else { - r.filesRepaired += len(ops) - } - for _, op := range ops { - if op.FileReader != nil && !op.DownloadFile { - if f, ok := op.FileReader.(io.Closer); ok { - f.Close() + for _, op := range ops { + if op.FileReader != nil && !op.DownloadFile { + if f, ok := op.FileReader.(io.Closer); ok { + f.Close() + } } } + errChan <- err + }() + select { + case <-r.repairCtx.Done(): + return + case <-errChan: + return } + } func (r *RepairRequest) getLocalPath(file *ListResult) string { @@ -325,7 +341,8 @@ func checkFileExists(localPath string) bool { return !info.IsDir() } -func (r *RepairRequest) checkForCancel(a *Allocation) bool { +func (r *RepairRequest) checkForCancel() bool { + return r.isRepairCanceled } @@ -338,21 +355,7 @@ type diffRef struct { func (r *RepairRequest) iterateDirV2(ctx context.Context) { versionMap := make(map[string]*diffRef) - status, _, err := r.allocation.CheckAllocStatus() - if err != nil { - l.Logger.Error("Failed to get allocation status ", err.Error()) - if r.statusCB != nil { - r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, err) - } - return - } - if status == Broken { - l.Logger.Error("Allocation is broken ", r.allocation.ID) - if r.statusCB != nil { - r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, errors.New("allocation is broken")) - } - return - } + r.allocation.CheckAllocStatus() //nolint:errcheck latestRoot := r.allocation.allocationRoot for idx, blobber := range r.allocation.Blobbers { if versionMap[blobber.AllocationRoot] == nil { @@ -360,13 +363,6 @@ func (r *RepairRequest) iterateDirV2(ctx context.Context) { } versionMap[blobber.AllocationRoot].mask = versionMap[blobber.AllocationRoot].mask.Or(zboxutil.NewUint128(1).Lsh(uint64(idx))) } - if versionMap[latestRoot] == nil { - l.Logger.Error("Failed to get latest allocation root ", latestRoot) - if r.statusCB != nil { - r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, errors.New("failed to get latest allocation root")) - } - return - } if versionMap[latestRoot].mask.CountOnes() < r.allocation.DataShards { l.Logger.Error("No consensus on latest allocation root: ", latestRoot) if r.statusCB != nil { @@ -394,7 +390,7 @@ func (r *RepairRequest) iterateDirV2(ctx context.Context) { ops []OperationRequest ) for { - if r.checkForCancel(r.allocation) { + if r.checkForCancel() { return } if toNextRef { @@ -476,11 +472,17 @@ func (r *RepairRequest) iterateDirV2(ctx context.Context) { op := r.uploadFileOp(srcRef, uploadMask) ops = append(ops, op) } + if r.checkForCancel() { + return + } if len(ops) >= RepairBatchSize { r.repairOperation(r.allocation, ops) ops = nil } } + if r.checkForCancel() { + return + } if len(ops) > 0 { r.repairOperation(r.allocation, ops) } diff --git a/zboxcore/sdk/upload_worker.go b/zboxcore/sdk/upload_worker.go index 285443c1e..8393a22e6 100644 --- a/zboxcore/sdk/upload_worker.go +++ b/zboxcore/sdk/upload_worker.go @@ -28,7 +28,10 @@ type UploadOperation struct { lookupHash string } -var ErrPauseUpload = errors.New("upload paused by user") +var ( + ErrPauseUpload = errors.New("upload paused by user") + ErrCancelUpload = errors.New("upload canceled by user") +) func (uo *UploadOperation) Process(allocObj *Allocation, connectionID string) ([]fileref.RefEntity, zboxutil.Uint128, error) { if uo.isDownload { From 00f6496102fc42a2f20c8fe52bc768c948711565 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 21 Dec 2024 17:04:30 +0530 Subject: [PATCH 2/7] fix ut --- zboxcore/sdk/allocation_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zboxcore/sdk/allocation_test.go b/zboxcore/sdk/allocation_test.go index 19b04f090..9a301a39b 100644 --- a/zboxcore/sdk/allocation_test.go +++ b/zboxcore/sdk/allocation_test.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "io" "io/fs" "log" @@ -18,6 +19,7 @@ import ( "github.com/0chain/gosdk/zboxcore/mocks" + encrypt "github.com/0chain/gosdk/core/encryption" "github.com/0chain/gosdk/dev/blobber" "github.com/0chain/gosdk/dev/blobber/model" "github.com/0chain/gosdk/zboxcore/encryption" @@ -1453,7 +1455,8 @@ func TestAllocation_CancelDownload(t *testing.T) { setup: func(t *testing.T, a *Allocation) (teardown func(t *testing.T)) { req := &DownloadRequest{} req.ctx, req.ctxCncl = context.WithCancel(context.TODO()) - a.downloadProgressMap[remotePath] = req + hash := encrypt.Hash(fmt.Sprintf("%s:%d:%d", remotePath, 1, 0)) + a.downloadProgressMap[hash] = req return nil }, }, From 3ad86b38520242a1d073d595befac6c78b6f52bc Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Tue, 24 Dec 2024 22:34:04 +0530 Subject: [PATCH 3/7] add debug logs --- zboxcore/sdk/listworker.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/zboxcore/sdk/listworker.go b/zboxcore/sdk/listworker.go index d8944557d..3fde92b6a 100644 --- a/zboxcore/sdk/listworker.go +++ b/zboxcore/sdk/listworker.go @@ -178,6 +178,10 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) { consensusMap := make(map[string][]*blockchain.StorageNode) var consensusHash string errCnt := 0 + if numList == 0 { + return nil, errors.New("no blobbers", "getlistFromBlobbers") + } + l.Logger.Debug("getListFromBlobbers: ", numList) for i := 0; i < numList; i++ { listInfos[i] = <-rspCh if !req.forRepair { @@ -212,6 +216,7 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) { return listInfos, listInfos[0].err } req.listOnly = true + l.Logger.Debug("listInfos: ", len(listInfos)) listInfos = listInfos[:1] listOnlyRespCh := make(chan *listResponse, 1) for i := 0; i < listLen; i++ { From 653f714085e86ff04af8b5710eea16efcaef1966 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Thu, 16 Jan 2025 17:20:37 +0530 Subject: [PATCH 4/7] return ctx cause --- wasmsdk/blobber.go | 4 ++-- zboxcore/sdk/chunked_upload.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 84fac2415..e5c14a827 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -856,9 +856,9 @@ func multiUpload(jsonBulkUploadOptions, batchKey string) (MultiUploadResult, err }() select { case <-ctx.Done(): - result.Error = ctx.Err().Error() + result.Error = context.Cause(ctx).Error() result.Success = false - return result, ctx.Err() + return result, context.Cause(ctx) case err := <-errChan: if err != nil { result.Error = err.Error() diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index 1408fa539..f3c050bcf 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -474,6 +474,7 @@ func (su *ChunkedUpload) process() error { ) if err != nil { if su.statusCallback != nil { + logger.Logger.Error("Error during processUpload: ", su.fileMeta.RemotePath, " error: ", err) su.statusCallback.Error(su.allocationObj.ID, su.fileMeta.RemotePath, su.opCode, err) } return err From 74ecab33af67e816cbf2827e4f4e3ba0c8b8070b Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 8 Mar 2025 20:05:48 +0530 Subject: [PATCH 5/7] merge repair changes --- zboxcore/sdk/repairworker.go | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/zboxcore/sdk/repairworker.go b/zboxcore/sdk/repairworker.go index f9891d649..f773c40ce 100644 --- a/zboxcore/sdk/repairworker.go +++ b/zboxcore/sdk/repairworker.go @@ -355,7 +355,21 @@ type diffRef struct { func (r *RepairRequest) iterateDirV2(ctx context.Context) { versionMap := make(map[string]*diffRef) - r.allocation.CheckAllocStatus() //nolint:errcheck + status, _, err := r.allocation.CheckAllocStatus() + if err != nil { + l.Logger.Error("Failed to get allocation status ", err.Error()) + if r.statusCB != nil { + r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, err) + } + return + } + if status == Broken { + l.Logger.Error("Allocation is broken ", r.allocation.ID) + if r.statusCB != nil { + r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, errors.New("allocation is broken")) + } + return + } latestRoot := r.allocation.allocationRoot for idx, blobber := range r.allocation.Blobbers { if versionMap[blobber.AllocationRoot] == nil { @@ -363,6 +377,13 @@ func (r *RepairRequest) iterateDirV2(ctx context.Context) { } versionMap[blobber.AllocationRoot].mask = versionMap[blobber.AllocationRoot].mask.Or(zboxutil.NewUint128(1).Lsh(uint64(idx))) } + if versionMap[latestRoot] == nil { + l.Logger.Error("Failed to get latest allocation root ", latestRoot) + if r.statusCB != nil { + r.statusCB.Error(r.allocation.ID, r.repairPath, OpRepair, errors.New("failed to get latest allocation root")) + } + return + } if versionMap[latestRoot].mask.CountOnes() < r.allocation.DataShards { l.Logger.Error("No consensus on latest allocation root: ", latestRoot) if r.statusCB != nil { From b5c0767b10b3cee6e21f4d6a41773b301ea5cfce Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 8 Mar 2025 20:07:14 +0530 Subject: [PATCH 6/7] cleanup --- zboxcore/sdk/listworker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zboxcore/sdk/listworker.go b/zboxcore/sdk/listworker.go index 128b0e133..ab572e709 100644 --- a/zboxcore/sdk/listworker.go +++ b/zboxcore/sdk/listworker.go @@ -182,7 +182,6 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) { if numList == 0 { return nil, errors.New("no blobbers", "getlistFromBlobbers") } - l.Logger.Debug("getListFromBlobbers: ", numList) for i := 0; i < numList; i++ { listInfos[i] = <-rspCh if !req.forRepair { From dc9da7844903f17676efaad8649bcbcaa305f830 Mon Sep 17 00:00:00 2001 From: Hitenjain14 Date: Sat, 8 Mar 2025 20:36:59 +0530 Subject: [PATCH 7/7] cleanup debug log --- zboxcore/sdk/listworker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zboxcore/sdk/listworker.go b/zboxcore/sdk/listworker.go index ab572e709..2913adcf3 100644 --- a/zboxcore/sdk/listworker.go +++ b/zboxcore/sdk/listworker.go @@ -216,7 +216,6 @@ func (req *ListRequest) getlistFromBlobbers() ([]*listResponse, error) { return listInfos, listInfos[0].err } req.listOnly = true - l.Logger.Debug("listInfos: ", len(listInfos)) listInfos = listInfos[:1] listOnlyRespCh := make(chan *listResponse, 1) for i := 0; i < listLen; i++ {