Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 77 additions & 25 deletions backend/internxt/internxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/internxt/rclone-adapter/folders"
"github.com/internxt/rclone-adapter/users"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
rclone_config "github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
Expand All @@ -30,15 +31,19 @@ import (
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
)

const (
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2 // bigger for slower decay, exponential
minSleep = 10 * time.Millisecond
maxSleep = 2 * time.Second
decayConstant = 2
maxUploadParts = 10000
minMultipartSize = 100 * 1024 * 1024
minChunkSize = fs.SizeSuffix(5 * 1024 * 1024)
)

// shouldRetry determines if an error should be retried.
Expand Down Expand Up @@ -101,6 +106,16 @@ func init() {
Default: true,
Advanced: true,
Help: "Skip hash validation when downloading files.\n\nBy default, hash validation is disabled. Set this to false to enable validation.",
}, {
Name: "chunk_size",
Help: "Chunk size for multipart uploads.\n\nLarge files will be uploaded in chunks of this size.\n\nMemory usage is approximately chunk_size * upload_concurrency.",
Default: fs.SizeSuffix(30 * 1024 * 1024),
Advanced: true,
}, {
Name: "upload_concurrency",
Help: "Concurrency for multipart uploads.\n\nThis is the number of chunks of the same file that are uploaded concurrently.\n\nNote that each chunk is buffered in memory.",
Default: 4,
Advanced: true,
}, {
Name: rclone_config.ConfigEncoding,
Help: rclone_config.ConfigEncodingHelp,
Expand Down Expand Up @@ -194,6 +209,8 @@ type Options struct {
TwoFA string `config:"2fa"`
Mnemonic string `config:"mnemonic"`
SkipHashValidation bool `config:"skip_hash_validation"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
Encoding encoder.MultiEncoder `config:"encoding"`
}

Expand Down Expand Up @@ -255,6 +272,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
return nil, err
}

if err := checkUploadChunkSize(opt.ChunkSize); err != nil {
return nil, fmt.Errorf("internxt: chunk size: %w", err)
}

if opt.Mnemonic == "" {
return nil, errors.New("mnemonic is required - please run: rclone config reconnect " + name + ":")
}
Expand Down Expand Up @@ -884,32 +905,63 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Debugf(o.f, "Renamed existing file %s to backup %s.%s (UUID: %s)", remote, backupName, backupType, backupUUID)
}

size := src.Size()

var meta *buckets.CreateMetaResponse
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
src.Size(),
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})
if size >= minMultipartSize {
ci := fs.GetConfig(ctx)
chunkSize := chunksize.Calculator(src, size, maxUploadParts, o.f.opt.ChunkSize)
if ci.MaxBufferMemory > 0 {
perTransfer := int64(ci.BufferSize) + int64(chunkSize)*int64(o.f.opt.UploadConcurrency)
needed := perTransfer * int64(ci.Transfers)
if int64(ci.MaxBufferMemory) < needed {
return fmt.Errorf("--max-buffer-memory %v is too small for multipart upload: need at least %v (%d transfers * (--buffer-size %v + chunk_size %v * upload_concurrency %d)); increase --max-buffer-memory or reduce transfers/chunk_size/upload_concurrency/buffer-size",
ci.MaxBufferMemory, fs.SizeSuffix(needed), ci.Transfers, ci.BufferSize, chunkSize, o.f.opt.UploadConcurrency)
}
}
chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{
Open: o.f,
OpenOptions: options,
})

if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
if uploadErr != nil {
if isEmptyFileLimitError(uploadErr) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return uploadErr
}
w := chunkWriter.(*internxtChunkWriter)
meta = w.meta
} else {
// Use single-part upload for small files
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
size,
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})

if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}
if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}

if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}

if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
}
}

// Update object metadata
Expand Down
5 changes: 5 additions & 0 deletions backend/internxt/internxt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ package internxt_test
import (
"testing"

"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fstest/fstests"
)

// TestIntegration runs integration tests against the remote
func TestIntegration(t *testing.T) {
fstests.Run(t, &fstests.Opt{
RemoteName: "TestInternxt:",
ChunkedUpload: fstests.ChunkedUploadConfig{
MinChunkSize: 100 * fs.Mebi,
NeedMultipleChunks: true,
},
})
}
Loading