diff --git a/backend/internxt/internxt.go b/backend/internxt/internxt.go index e2cfaaa55db08..1083348276693 100644 --- a/backend/internxt/internxt.go +++ b/backend/internxt/internxt.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "net" "path" "path/filepath" "strings" @@ -42,6 +43,9 @@ func shouldRetry(ctx context.Context, err error) (bool, error) { if fserrors.ContextError(ctx, &err) { return false, err } + if err != nil && (strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "Unauthorized")) { + return true, err + } return fserrors.ShouldRetry(err), err } @@ -72,7 +76,8 @@ func init() { encoder.EncodeSlash | encoder.EncodeBackSlash | encoder.EncodeRightPeriod | - encoder.EncodeDot, + encoder.EncodeDot | + encoder.EncodeCrLf, }, }}, ) @@ -414,12 +419,20 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (string, error) } var resp *folders.Folder - err := f.pacer.Call(func() (bool, error) { + err := f.pacer.CallNoRetry(func() (bool, error) { var err error resp, err = folders.CreateFolder(ctx, f.cfg, request) return shouldRetry(ctx, err) }) if err != nil { + // If folder already exists (409 conflict), try to find it + if strings.Contains(err.Error(), "409") || strings.Contains(err.Error(), "Conflict") { + existingID, found, findErr := f.FindLeaf(ctx, pathID, leaf) + if findErr == nil && found { + fs.Debugf(f, "Folder %q already exists in %q, using existing UUID: %s", leaf, pathID, existingID) + return existingID, nil + } + } return "", fmt.Errorf("can't create folder, %w", err) } @@ -447,7 +460,7 @@ func (f *Fs) preUploadCheck(ctx context.Context, leaf, directoryID string) (*fol return nil, nil } - if len(checkResult.Files) > 0 && checkResult.Files[0].Exists { + if len(checkResult.Files) > 0 && checkResult.Files[0].FileExists() { existingUUID := checkResult.Files[0].UUID if existingUUID != "" { fileMeta, err := files.GetFileMeta(ctx, f.cfg, existingUUID) @@ -460,7 +473,6 @@ func (f *Fs) preUploadCheck(ctx context.Context, leaf, directoryID string) (*fol } } } - return nil, nil } @@ -530,6 +542,11 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) { // Put uploads a file func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { remote := src.Remote() + + if src.Size() == 0 && !f.opt.SimulateEmptyFiles { + return nil, fs.ErrorCantUploadEmptyFiles + } + leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, false) if err != nil { if err == fs.ErrorDirNotFound { @@ -624,7 +641,11 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { if len(e.Type) > 0 { name += "." + e.Type } - if f.opt.Encoding.ToStandardName(name) == filepath.Base(remote) { + decodedName := f.opt.Encoding.ToStandardName(name) + targetName := filepath.Base(remote) + match := decodedName == targetName + + if match { return newObjectWithFile(f, remote, &e), nil } // If we are simulating empty files, check for a file with the special suffix and if found return it as if empty. @@ -824,7 +845,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op } var meta *buckets.CreateMetaResponse - err = o.f.pacer.Call(func() (bool, error) { + err = o.f.pacer.CallNoRetry(func() (bool, error) { var err error meta, err = buckets.UploadFileStreamAuto(ctx, o.f.cfg, @@ -838,35 +859,118 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op }) if err != nil { - // Upload failed - restore backup if it exists - if backupUUID != "" { - fs.Debugf(o.f, "Upload failed, attempting to restore backup %s.%s to %s", backupName, backupType, remote) - - restoreErr := o.f.pacer.Call(func() (bool, error) { - err := files.RenameFile(ctx, o.f.cfg, backupUUID, - o.f.opt.Encoding.FromStandardName(origName), origType) - return shouldRetry(ctx, err) - }) - if restoreErr != nil { - fs.Errorf(o.f, "CRITICAL: Upload failed AND backup restore failed: %v. Backup file remains as %s.%s (UUID: %s)", - restoreErr, backupName, backupType, backupUUID) - return fmt.Errorf("upload failed: %w (backup restore also failed: %v)", err, restoreErr) - } - fs.Debugf(o.f, "Upload failed, successfully restored backup file to original name") - } + meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID) + } + + if err != nil { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return err } // Update object metadata o.uuid = meta.UUID + o.id = meta.FileID o.size = src.Size() o.remote = remote if isEmptyFile { o.size = 0 } + // Step 3: Upload succeeded - delete the backup file + if backupUUID != "" { + fs.Debugf(o.f, "Upload succeeded, deleting backup file %s.%s (UUID: %s)", backupName, backupType, backupUUID) + err := o.f.pacer.Call(func() (bool, error) { + err := files.DeleteFile(ctx, o.f.cfg, backupUUID) + return shouldRetry(ctx, err) + }) + if err != nil { + fs.Errorf(o.f, "Failed to delete backup file %s.%s (UUID: %s): %v. This may leave an orphaned backup file.", + backupName, backupType, backupUUID, err) + // Don't fail the upload just because backup deletion failed + } else { + fs.Debugf(o.f, "Successfully deleted backup file") + } + } + return nil } +// isTimeoutError checks if an error is a timeout using proper error type checking +func isTimeoutError(err error) bool { + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + return false +} + +// isConflictError checks if an error indicates a file conflict (409) +func isConflictError(err error) bool { + errMsg := err.Error() + return strings.Contains(errMsg, "409") || + strings.Contains(errMsg, "Conflict") || + strings.Contains(errMsg, "already exists") +} + +// recoverFromTimeoutConflict attempts to recover from a timeout or conflict error +func (o *Object) recoverFromTimeoutConflict(ctx context.Context, uploadErr error, remote, dirID string) (*buckets.CreateMetaResponse, error) { + if !isTimeoutError(uploadErr) && !isConflictError(uploadErr) { + return nil, uploadErr + } + + baseName := filepath.Base(remote) + encodedName := o.f.opt.Encoding.FromStandardName(baseName) + + var meta *buckets.CreateMetaResponse + checkErr := o.f.pacer.Call(func() (bool, error) { + existingFile, err := o.f.preUploadCheck(ctx, encodedName, dirID) + if err != nil { + return shouldRetry(ctx, err) + } + if existingFile != nil { + name := strings.TrimSuffix(baseName, filepath.Ext(baseName)) + ext := strings.TrimPrefix(filepath.Ext(baseName), ".") + + meta = &buckets.CreateMetaResponse{ + UUID: existingFile.UUID, + FileID: existingFile.FileID, + Name: name, + PlainName: name, + Type: ext, + Size: existingFile.Size, + } + o.id = existingFile.FileID + } + return false, nil + }) + + if checkErr != nil { + return nil, uploadErr + } + + if meta != nil { + return meta, nil + } + + return nil, uploadErr +} + +// restoreBackupFile restores a backup file after upload failure +func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, origType string) { + if backupUUID == "" { + return + } + + o.f.pacer.Call(func() (bool, error) { + err := files.RenameFile(ctx, o.f.cfg, backupUUID, + o.f.opt.Encoding.FromStandardName(origName), origType) + return shouldRetry(ctx, err) + }) +} + // Remove deletes a file func (o *Object) Remove(ctx context.Context) error { return o.f.pacer.Call(func() (bool, error) {