Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 126 additions & 22 deletions backend/internxt/internxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net"
"path"
"path/filepath"
"strings"
Expand Down Expand Up @@ -42,6 +43,9 @@ func shouldRetry(ctx context.Context, err error) (bool, error) {
if fserrors.ContextError(ctx, &err) {
return false, err
}
if err != nil && (strings.Contains(err.Error(), "401") || strings.Contains(err.Error(), "Unauthorized")) {
return true, err
}
return fserrors.ShouldRetry(err), err
}

Expand Down Expand Up @@ -72,7 +76,8 @@ func init() {
encoder.EncodeSlash |
encoder.EncodeBackSlash |
encoder.EncodeRightPeriod |
encoder.EncodeDot,
encoder.EncodeDot |
encoder.EncodeCrLf,
},
}},
)
Expand Down Expand Up @@ -414,12 +419,20 @@ func (f *Fs) CreateDir(ctx context.Context, pathID, leaf string) (string, error)
}

var resp *folders.Folder
err := f.pacer.Call(func() (bool, error) {
err := f.pacer.CallNoRetry(func() (bool, error) {
var err error
resp, err = folders.CreateFolder(ctx, f.cfg, request)
return shouldRetry(ctx, err)
})
if err != nil {
// If folder already exists (409 conflict), try to find it
if strings.Contains(err.Error(), "409") || strings.Contains(err.Error(), "Conflict") {
existingID, found, findErr := f.FindLeaf(ctx, pathID, leaf)
if findErr == nil && found {
fs.Debugf(f, "Folder %q already exists in %q, using existing UUID: %s", leaf, pathID, existingID)
return existingID, nil
}
}
return "", fmt.Errorf("can't create folder, %w", err)
}

Expand Down Expand Up @@ -447,7 +460,7 @@ func (f *Fs) preUploadCheck(ctx context.Context, leaf, directoryID string) (*fol
return nil, nil
}

if len(checkResult.Files) > 0 && checkResult.Files[0].Exists {
if len(checkResult.Files) > 0 && checkResult.Files[0].FileExists() {
existingUUID := checkResult.Files[0].UUID
if existingUUID != "" {
fileMeta, err := files.GetFileMeta(ctx, f.cfg, existingUUID)
Expand All @@ -460,7 +473,6 @@ func (f *Fs) preUploadCheck(ctx context.Context, leaf, directoryID string) (*fol
}
}
}

return nil, nil
}

Expand Down Expand Up @@ -530,6 +542,11 @@ func (f *Fs) List(ctx context.Context, dir string) (fs.DirEntries, error) {
// Put uploads a file
func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
remote := src.Remote()

if src.Size() == 0 && !f.opt.SimulateEmptyFiles {
return nil, fs.ErrorCantUploadEmptyFiles
}

leaf, directoryID, err := f.dirCache.FindPath(ctx, remote, false)
if err != nil {
if err == fs.ErrorDirNotFound {
Expand Down Expand Up @@ -624,7 +641,11 @@ func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
if len(e.Type) > 0 {
name += "." + e.Type
}
if f.opt.Encoding.ToStandardName(name) == filepath.Base(remote) {
decodedName := f.opt.Encoding.ToStandardName(name)
targetName := filepath.Base(remote)
match := decodedName == targetName

if match {
return newObjectWithFile(f, remote, &e), nil
}
// If we are simulating empty files, check for a file with the special suffix and if found return it as if empty.
Expand Down Expand Up @@ -824,7 +845,7 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
}

var meta *buckets.CreateMetaResponse
err = o.f.pacer.Call(func() (bool, error) {
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
Expand All @@ -838,35 +859,118 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
})

if err != nil {
// Upload failed - restore backup if it exists
if backupUUID != "" {
fs.Debugf(o.f, "Upload failed, attempting to restore backup %s.%s to %s", backupName, backupType, remote)

restoreErr := o.f.pacer.Call(func() (bool, error) {
err := files.RenameFile(ctx, o.f.cfg, backupUUID,
o.f.opt.Encoding.FromStandardName(origName), origType)
return shouldRetry(ctx, err)
})
if restoreErr != nil {
fs.Errorf(o.f, "CRITICAL: Upload failed AND backup restore failed: %v. Backup file remains as %s.%s (UUID: %s)",
restoreErr, backupName, backupType, backupUUID)
return fmt.Errorf("upload failed: %w (backup restore also failed: %v)", err, restoreErr)
}
fs.Debugf(o.f, "Upload failed, successfully restored backup file to original name")
}
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}

if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
}

// Update object metadata
o.uuid = meta.UUID
o.id = meta.FileID
o.size = src.Size()
o.remote = remote
if isEmptyFile {
o.size = 0
}

// Step 3: Upload succeeded - delete the backup file
if backupUUID != "" {
fs.Debugf(o.f, "Upload succeeded, deleting backup file %s.%s (UUID: %s)", backupName, backupType, backupUUID)
err := o.f.pacer.Call(func() (bool, error) {
err := files.DeleteFile(ctx, o.f.cfg, backupUUID)
return shouldRetry(ctx, err)
})
if err != nil {
fs.Errorf(o.f, "Failed to delete backup file %s.%s (UUID: %s): %v. This may leave an orphaned backup file.",
backupName, backupType, backupUUID, err)
// Don't fail the upload just because backup deletion failed
} else {
fs.Debugf(o.f, "Successfully deleted backup file")
}
}

return nil
}

// isTimeoutError checks if an error is a timeout using proper error type checking
func isTimeoutError(err error) bool {
if errors.Is(err, context.DeadlineExceeded) {
return true
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return true
}
return false
}

// isConflictError checks if an error indicates a file conflict (409)
func isConflictError(err error) bool {
errMsg := err.Error()
return strings.Contains(errMsg, "409") ||
strings.Contains(errMsg, "Conflict") ||
strings.Contains(errMsg, "already exists")
}

// recoverFromTimeoutConflict attempts to recover from a timeout or conflict error
func (o *Object) recoverFromTimeoutConflict(ctx context.Context, uploadErr error, remote, dirID string) (*buckets.CreateMetaResponse, error) {
if !isTimeoutError(uploadErr) && !isConflictError(uploadErr) {
return nil, uploadErr
}

baseName := filepath.Base(remote)
encodedName := o.f.opt.Encoding.FromStandardName(baseName)

var meta *buckets.CreateMetaResponse
checkErr := o.f.pacer.Call(func() (bool, error) {
existingFile, err := o.f.preUploadCheck(ctx, encodedName, dirID)
if err != nil {
return shouldRetry(ctx, err)
}
if existingFile != nil {
name := strings.TrimSuffix(baseName, filepath.Ext(baseName))
ext := strings.TrimPrefix(filepath.Ext(baseName), ".")

meta = &buckets.CreateMetaResponse{
UUID: existingFile.UUID,
FileID: existingFile.FileID,
Name: name,
PlainName: name,
Type: ext,
Size: existingFile.Size,
}
o.id = existingFile.FileID
}
return false, nil
})

if checkErr != nil {
return nil, uploadErr
}

if meta != nil {
return meta, nil
}

return nil, uploadErr
}

// restoreBackupFile restores a backup file after upload failure
func (o *Object) restoreBackupFile(ctx context.Context, backupUUID, origName, origType string) {
if backupUUID == "" {
return
}

o.f.pacer.Call(func() (bool, error) {
err := files.RenameFile(ctx, o.f.cfg, backupUUID,
o.f.opt.Encoding.FromStandardName(origName), origType)
return shouldRetry(ctx, err)
})
}

// Remove deletes a file
func (o *Object) Remove(ctx context.Context) error {
return o.f.pacer.Call(func() (bool, error) {
Expand Down