From 7e81482c2703a44248821ac11cf11155dc6383f0 Mon Sep 17 00:00:00 2001 From: jzunigax2 <125698953+jzunigax2@users.noreply.github.com> Date: Wed, 18 Feb 2026 17:44:06 -0600 Subject: [PATCH 1/3] internxt: implement multipart upload support with configurable chunk size and concurrency options --- backend/internxt/internxt.go | 145 ++++++++++++++++------ backend/internxt/internxt_test.go | 5 + backend/internxt/upload.go | 199 ++++++++++++++++++++++++++++++ docs/content/internxt.md | 30 +++++ go.mod | 2 +- go.sum | 4 + 6 files changed, 346 insertions(+), 39 deletions(-) create mode 100644 backend/internxt/upload.go diff --git a/backend/internxt/internxt.go b/backend/internxt/internxt.go index a221abe323012..c2ce206db438f 100644 --- a/backend/internxt/internxt.go +++ b/backend/internxt/internxt.go @@ -21,6 +21,7 @@ import ( "github.com/internxt/rclone-adapter/folders" "github.com/internxt/rclone-adapter/users" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" rclone_config "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" @@ -30,15 +31,19 @@ import ( "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" ) const ( - minSleep = 10 * time.Millisecond - maxSleep = 2 * time.Second - decayConstant = 2 // bigger for slower decay, exponential + minSleep = 10 * time.Millisecond + maxSleep = 2 * time.Second + decayConstant = 2 + maxUploadParts = 10000 + minMultipartSize = 100 * 1024 * 1024 + minChunkSize = fs.SizeSuffix(5 * 1024 * 1024) ) // shouldRetry determines if an error should be retried. @@ -101,6 +106,16 @@ func init() { Default: true, Advanced: true, Help: "Skip hash validation when downloading files.\n\nBy default, hash validation is disabled. Set this to false to enable validation.", + }, { + Name: "chunk_size", + Help: "Chunk size for multipart uploads.\n\nLarge files will be uploaded in chunks of this size.\n\nMemory usage is approximately chunk_size * upload_concurrency.", + Default: fs.SizeSuffix(30 * 1024 * 1024), + Advanced: true, + }, { + Name: "upload_concurrency", + Help: "Concurrency for multipart uploads.\n\nThis is the number of chunks of the same file that are uploaded concurrently.\n\nNote that each chunk is buffered in memory.", + Default: 4, + Advanced: true, }, { Name: rclone_config.ConfigEncoding, Help: rclone_config.ConfigEncodingHelp, @@ -194,24 +209,27 @@ type Options struct { TwoFA string `config:"2fa"` Mnemonic string `config:"mnemonic"` SkipHashValidation bool `config:"skip_hash_validation"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` Encoding encoder.MultiEncoder `config:"encoding"` } // Fs represents an Internxt remote type Fs struct { - name string - root string - opt Options - m configmap.Mapper - dirCache *dircache.DirCache - cfg *config.Config - features *fs.Features - pacer *fs.Pacer - tokenRenewer *oauthutil.Renew - bridgeUser string - userID string - authMu sync.Mutex - authFailed bool + name string + root string + opt Options + m configmap.Mapper + dirCache *dircache.DirCache + cfg *config.Config + features *fs.Features + pacer *fs.Pacer + tokenRenewer *oauthutil.Renew + bridgeUser string + userID string + authMu sync.Mutex + authFailed bool + pendingSession *buckets.ChunkUploadSession } // Object holds the data for a remote file object @@ -334,6 +352,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.features = (&fs.Features{ CanHaveEmptyDirectories: true, }).Fill(ctx, f) + f.features.OpenChunkWriter = nil if ts != nil { f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { @@ -884,32 +903,82 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op fs.Debugf(o.f, "Renamed existing file %s to backup %s.%s (UUID: %s)", remote, backupName, backupType, backupUUID) } + size := src.Size() + var meta *buckets.CreateMetaResponse - err = o.f.pacer.CallNoRetry(func() (bool, error) { - var err error - meta, err = buckets.UploadFileStreamAuto(ctx, - o.f.cfg, - dirID, - o.f.opt.Encoding.FromStandardName(path.Base(remote)), - in, - src.Size(), - src.ModTime(ctx), - ) - return o.f.shouldRetry(ctx, err) - }) + if size >= minMultipartSize { + ci := fs.GetConfig(ctx) + chunkSize := chunksize.Calculator(src, size, maxUploadParts, o.f.opt.ChunkSize) + if ci.MaxBufferMemory > 0 { + perTransfer := int64(ci.BufferSize) + int64(chunkSize)*int64(o.f.opt.UploadConcurrency) + needed := perTransfer * int64(ci.Transfers) + if int64(ci.MaxBufferMemory) < needed { + return fmt.Errorf("--max-buffer-memory %v is too small for multipart upload: need at least %v (%d transfers * (--buffer-size %v + chunk_size %v * upload_concurrency %d)); increase --max-buffer-memory or reduce transfers/chunk_size/upload_concurrency/buffer-size", + ci.MaxBufferMemory, fs.SizeSuffix(needed), ci.Transfers, ci.BufferSize, chunkSize, o.f.opt.UploadConcurrency) + } + } + var session *buckets.ChunkUploadSession + err = o.f.pacer.Call(func() (bool, error) { + var err error + session, err = buckets.NewChunkUploadSession(ctx, o.f.cfg, size, int64(chunkSize)) + return o.f.shouldRetry(ctx, err) + }) + if err != nil { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fmt.Errorf("failed to create upload session: %w", err) + } - if err != nil && isEmptyFileLimitError(err) { - o.restoreBackupFile(ctx, backupUUID, origName, origType) - return fs.ErrorCantUploadEmptyFiles - } + // Wrap reader with SDK's encrypting reader + encReader := session.EncryptingReader(in) - if err != nil { - meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID) - } + // Store session for OpenChunkWriter to pick up + o.f.pendingSession = session - if err != nil { - o.restoreBackupFile(ctx, backupUUID, origName, origType) - return err + chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, encReader, multipart.UploadMultipartOptions{ + Open: o.f, + OpenOptions: options, + }) + + o.f.pendingSession = nil + + if uploadErr != nil { + if isEmptyFileLimitError(uploadErr) { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fs.ErrorCantUploadEmptyFiles + } + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return uploadErr + } + w := chunkWriter.(*internxtChunkWriter) + meta = w.meta + } else { + // Use single-part upload for small files + err = o.f.pacer.CallNoRetry(func() (bool, error) { + var err error + meta, err = buckets.UploadFileStreamAuto(ctx, + o.f.cfg, + dirID, + o.f.opt.Encoding.FromStandardName(path.Base(remote)), + in, + size, + src.ModTime(ctx), + ) + return o.f.shouldRetry(ctx, err) + }) + + if err != nil && isEmptyFileLimitError(err) { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fs.ErrorCantUploadEmptyFiles + } + + if err != nil { + meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID) + } + + if err != nil { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return err + } } // Update object metadata diff --git a/backend/internxt/internxt_test.go b/backend/internxt/internxt_test.go index 01e23b3b20606..154314ac64582 100644 --- a/backend/internxt/internxt_test.go +++ b/backend/internxt/internxt_test.go @@ -3,6 +3,7 @@ package internxt_test import ( "testing" + "github.com/rclone/rclone/fs" "github.com/rclone/rclone/fstest/fstests" ) @@ -10,5 +11,9 @@ import ( func TestIntegration(t *testing.T) { fstests.Run(t, &fstests.Opt{ RemoteName: "TestInternxt:", + ChunkedUpload: fstests.ChunkedUploadConfig{ + MinChunkSize: 100 * fs.Mebi, + NeedMultipleChunks: true, + }, }) } diff --git a/backend/internxt/upload.go b/backend/internxt/upload.go new file mode 100644 index 0000000000000..0989db1422c0b --- /dev/null +++ b/backend/internxt/upload.go @@ -0,0 +1,199 @@ +package internxt + +import ( + "context" + "fmt" + "io" + "path" + "sort" + "strings" + "sync" + + "github.com/internxt/rclone-adapter/buckets" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" +) + +var warnStreamUpload sync.Once + +func checkUploadChunkSize(cs fs.SizeSuffix) error { + if cs < minChunkSize { + return fmt.Errorf("%s is less than %s", cs, minChunkSize) + } + return nil +} + +// SetUploadChunkSize sets the chunk size used for multipart uploads +func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { + err := checkUploadChunkSize(cs) + if err == nil { + old := f.opt.ChunkSize + f.opt.ChunkSize = cs + return old, nil + } + return f.opt.ChunkSize, err +} + +// internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads. +// All encryption is handled by the SDK's ChunkUploadSession. +type internxtChunkWriter struct { + f *Fs + remote string + src fs.ObjectInfo + session *buckets.ChunkUploadSession + completedParts []buckets.CompletedPart + partsMu sync.Mutex + size int64 + dirID string + meta *buckets.CreateMetaResponse +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads. +// +// When called from Update (via multipart.UploadMultipart), the session is +// pre-created and stored in f.pendingSession so that the encrypting reader +// can be applied to the input before UploadMultipart reads from it. +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { + size := src.Size() + + chunkSize := f.opt.ChunkSize + if size < 0 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(maxUploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize) + } + + // Ensure parent directory exists + _, dirID, err := f.dirCache.FindPath(ctx, remote, true) + if err != nil { + return info, nil, fmt.Errorf("failed to find parent directory: %w", err) + } + + // Use pre-created session from Update() if available, otherwise create one + session := f.pendingSession + if session == nil { + err = f.pacer.Call(func() (bool, error) { + var err error + session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize)) + return f.shouldRetry(ctx, err) + }) + if err != nil { + return info, nil, fmt.Errorf("failed to create upload session: %w", err) + } + } + + w := &internxtChunkWriter{ + f: f, + remote: remote, + src: src, + session: session, + size: size, + dirID: dirID, + } + + info = fs.ChunkWriterInfo{ + ChunkSize: int64(chunkSize), + Concurrency: f.opt.UploadConcurrency, + LeavePartsOnError: false, + } + + return info, w, nil +} + +// WriteChunk uploads chunk number with reader bytes. +// The data has already been encrypted by the EncryptingReader applied +// to the input stream before UploadMultipart started reading. +func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { + // Determine chunk size from the reader + currentPos, err := reader.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("failed to get current position: %w", err) + } + end, err := reader.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("failed to seek to end: %w", err) + } + size := end - currentPos + if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { + return 0, fmt.Errorf("failed to seek back: %w", err) + } + + if size == 0 { + return 0, nil + } + + var etag string + err = w.f.pacer.Call(func() (bool, error) { + // Seek back to start for retries + if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { + return false, err + } + var uploadErr error + etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, reader, size) + return w.f.shouldRetry(ctx, uploadErr) + }) + if err != nil { + return 0, err + } + + w.partsMu.Lock() + w.completedParts = append(w.completedParts, buckets.CompletedPart{ + PartNumber: chunkNumber + 1, + ETag: etag, + }) + w.partsMu.Unlock() + + return size, nil +} + +// Close completes the multipart upload and registers the file in Internxt Drive. +func (w *internxtChunkWriter) Close(ctx context.Context) error { + // Sort parts by part number + w.partsMu.Lock() + sort.Slice(w.completedParts, func(i, j int) bool { + return w.completedParts[i].PartNumber < w.completedParts[j].PartNumber + }) + parts := make([]buckets.CompletedPart, len(w.completedParts)) + copy(parts, w.completedParts) + w.partsMu.Unlock() + + // Finish multipart upload (SDK computes hash + calls FinishMultipartUpload) + var finishResp *buckets.FinishUploadResp + err := w.f.pacer.Call(func() (bool, error) { + var err error + finishResp, err = w.session.Finish(ctx, parts) + return w.f.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to finish multipart upload: %w", err) + } + + // Create file metadata in Internxt Drive + baseName := w.f.opt.Encoding.FromStandardName(path.Base(w.remote)) + name := strings.TrimSuffix(baseName, path.Ext(baseName)) + ext := strings.TrimPrefix(path.Ext(baseName), ".") + + var meta *buckets.CreateMetaResponse + err = w.f.pacer.Call(func() (bool, error) { + var err error + meta, err = buckets.CreateMetaFile(ctx, w.f.cfg, + name, w.f.cfg.Bucket, &finishResp.ID, "03-aes", + w.dirID, name, ext, w.size, w.src.ModTime(ctx)) + return w.f.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to create file metadata: %w", err) + } + w.meta = meta + + return nil +} + +// Abort cleans up after a failed upload. +func (w *internxtChunkWriter) Abort(ctx context.Context) error { + fs.Logf(w.f, "Multipart upload aborted for %s", w.remote) + return nil +} diff --git a/docs/content/internxt.md b/docs/content/internxt.md index 57a1c1014b19e..0bd25dfd15f99 100644 --- a/docs/content/internxt.md +++ b/docs/content/internxt.md @@ -161,6 +161,36 @@ Properties: - Type: bool - Default: true +#### --internxt-chunk-size + +Chunk size for multipart uploads. + +Large files will be uploaded in chunks of this size. + +Memory usage is approximately chunk_size * upload_concurrency. + +Properties: + +- Config: chunk_size +- Env Var: RCLONE_INTERNXT_CHUNK_SIZE +- Type: SizeSuffix +- Default: 30Mi + +#### --internxt-upload-concurrency + +Concurrency for multipart uploads. + +This is the number of chunks of the same file that are uploaded concurrently. + +Note that each chunk is buffered in memory. + +Properties: + +- Config: upload_concurrency +- Env Var: RCLONE_INTERNXT_UPLOAD_CONCURRENCY +- Type: int +- Default: 4 + #### --internxt-encoding The encoding for the backend. diff --git a/go.mod b/go.mod index 61b5edd6be6e3..fb6d6c5703d21 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.3.1 github.com/google/uuid v1.6.0 github.com/hanwen/go-fuse/v2 v2.9.0 - github.com/internxt/rclone-adapter v0.0.0-20260220172730-613f4cc8b8fd + github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d github.com/jcmturner/gokrb5/v8 v8.4.4 github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3 github.com/josephspurrier/goversioninfo v1.5.0 diff --git a/go.sum b/go.sum index 0515fa7e454ce..6351ca7515d8e 100644 --- a/go.sum +++ b/go.sum @@ -425,6 +425,10 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/internxt/rclone-adapter v0.0.0-20260220172730-613f4cc8b8fd h1:dSIuz2mpJAPQfhHYtG57D0qwSkgC/vQ69gHfeyQ4kxA= github.com/internxt/rclone-adapter v0.0.0-20260220172730-613f4cc8b8fd/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= +github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9 h1:lxVac1uYH2qmAFlCeBZjlueL72PlrOEOoc5K/tbCRr8= +github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= +github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d h1:dMd7BPNXvz73BBwYSvqInndDkW+q8OAUu8+zSClYtEs= +github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= From f15de9b8a482b0149604d030460c4ee6be7bd16a Mon Sep 17 00:00:00 2001 From: jzunigax2 <125698953+jzunigax2@users.noreply.github.com> Date: Wed, 11 Mar 2026 22:04:04 -0600 Subject: [PATCH 2/3] internxt: enable OpenChunkWriter with per-chunk encryption --- backend/internxt/internxt.go | 53 ++++-------- backend/internxt/upload.go | 136 +++++++++++++++++++----------- docs/content/internxt.md | 30 ------- fs/features.go | 1 + fs/operations/multithread_test.go | 19 +++-- fstest/fstests/fstests.go | 16 ++-- go.mod | 2 +- go.sum | 2 + 8 files changed, 134 insertions(+), 125 deletions(-) diff --git a/backend/internxt/internxt.go b/backend/internxt/internxt.go index c2ce206db438f..835604c86aee4 100644 --- a/backend/internxt/internxt.go +++ b/backend/internxt/internxt.go @@ -216,20 +216,19 @@ type Options struct { // Fs represents an Internxt remote type Fs struct { - name string - root string - opt Options - m configmap.Mapper - dirCache *dircache.DirCache - cfg *config.Config - features *fs.Features - pacer *fs.Pacer - tokenRenewer *oauthutil.Renew - bridgeUser string - userID string - authMu sync.Mutex - authFailed bool - pendingSession *buckets.ChunkUploadSession + name string + root string + opt Options + m configmap.Mapper + dirCache *dircache.DirCache + cfg *config.Config + features *fs.Features + pacer *fs.Pacer + tokenRenewer *oauthutil.Renew + bridgeUser string + userID string + authMu sync.Mutex + authFailed bool } // Object holds the data for a remote file object @@ -273,6 +272,10 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e return nil, err } + if err := checkUploadChunkSize(opt.ChunkSize); err != nil { + return nil, fmt.Errorf("internxt: chunk size: %w", err) + } + if opt.Mnemonic == "" { return nil, errors.New("mnemonic is required - please run: rclone config reconnect " + name + ":") } @@ -352,7 +355,6 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.features = (&fs.Features{ CanHaveEmptyDirectories: true, }).Fill(ctx, f) - f.features.OpenChunkWriter = nil if ts != nil { f.tokenRenewer = oauthutil.NewRenew(f.String(), ts, func() error { @@ -917,30 +919,11 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op ci.MaxBufferMemory, fs.SizeSuffix(needed), ci.Transfers, ci.BufferSize, chunkSize, o.f.opt.UploadConcurrency) } } - var session *buckets.ChunkUploadSession - err = o.f.pacer.Call(func() (bool, error) { - var err error - session, err = buckets.NewChunkUploadSession(ctx, o.f.cfg, size, int64(chunkSize)) - return o.f.shouldRetry(ctx, err) - }) - if err != nil { - o.restoreBackupFile(ctx, backupUUID, origName, origType) - return fmt.Errorf("failed to create upload session: %w", err) - } - - // Wrap reader with SDK's encrypting reader - encReader := session.EncryptingReader(in) - - // Store session for OpenChunkWriter to pick up - o.f.pendingSession = session - - chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, encReader, multipart.UploadMultipartOptions{ + chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, in, multipart.UploadMultipartOptions{ Open: o.f, OpenOptions: options, }) - o.f.pendingSession = nil - if uploadErr != nil { if isEmptyFileLimitError(uploadErr) { o.restoreBackupFile(ctx, backupUUID, origName, origType) diff --git a/backend/internxt/upload.go b/backend/internxt/upload.go index 0989db1422c0b..407304574cbb7 100644 --- a/backend/internxt/upload.go +++ b/backend/internxt/upload.go @@ -1,6 +1,7 @@ package internxt import ( + "bytes" "context" "fmt" "io" @@ -35,7 +36,6 @@ func (f *Fs) SetUploadChunkSize(cs fs.SizeSuffix) (fs.SizeSuffix, error) { } // internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads. -// All encryption is handled by the SDK's ChunkUploadSession. type internxtChunkWriter struct { f *Fs remote string @@ -46,16 +46,28 @@ type internxtChunkWriter struct { size int64 dirID string meta *buckets.CreateMetaResponse + chunkSize int64 + hashMu sync.Mutex + nextHashChunk int + pendingChunks map[int][]byte } // OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads. -// -// When called from Update (via multipart.UploadMultipart), the session is -// pre-created and stored in f.pendingSession so that the encrypting reader -// can be applied to the input before UploadMultipart reads from it. func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { size := src.Size() + info = fs.ChunkWriterInfo{ + ChunkSize: int64(f.opt.ChunkSize), + Concurrency: f.opt.UploadConcurrency, + LeavePartsOnError: false, + MinFileSize: minMultipartSize, + } + + // Reject files below the multipart minimum + if size >= 0 && size < minMultipartSize { + return info, nil, fmt.Errorf("file size %d is below minimum %d for multipart upload", size, minMultipartSize) + } + chunkSize := f.opt.ChunkSize if size < 0 { warnStreamUpload.Do(func() { @@ -64,6 +76,7 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn }) } else { chunkSize = chunksize.Calculator(src, size, maxUploadParts, chunkSize) + info.ChunkSize = int64(chunkSize) } // Ensure parent directory exists @@ -72,85 +85,114 @@ func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectIn return info, nil, fmt.Errorf("failed to find parent directory: %w", err) } - // Use pre-created session from Update() if available, otherwise create one - session := f.pendingSession - if session == nil { - err = f.pacer.Call(func() (bool, error) { - var err error - session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize)) - return f.shouldRetry(ctx, err) - }) - if err != nil { - return info, nil, fmt.Errorf("failed to create upload session: %w", err) - } + var session *buckets.ChunkUploadSession + err = f.pacer.Call(func() (bool, error) { + var err error + session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize)) + return f.shouldRetry(ctx, err) + }) + if err != nil { + return info, nil, fmt.Errorf("failed to create upload session: %w", err) } w := &internxtChunkWriter{ - f: f, - remote: remote, - src: src, - session: session, - size: size, - dirID: dirID, - } - - info = fs.ChunkWriterInfo{ - ChunkSize: int64(chunkSize), - Concurrency: f.opt.UploadConcurrency, - LeavePartsOnError: false, + f: f, + remote: remote, + src: src, + session: session, + size: size, + dirID: dirID, + chunkSize: int64(chunkSize), + pendingChunks: make(map[int][]byte), } return info, w, nil } -// WriteChunk uploads chunk number with reader bytes. -// The data has already been encrypted by the EncryptingReader applied -// to the input stream before UploadMultipart started reading. +// WriteChunk encrypts plaintext per-chunk using AES-256-CTR at the correct +// byte offset, feeds encrypted data into the ordered hash accumulator, and +// uploads to the presigned URL. func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { - // Determine chunk size from the reader - currentPos, err := reader.Seek(0, io.SeekCurrent) + plaintext, err := io.ReadAll(reader) if err != nil { - return 0, fmt.Errorf("failed to get current position: %w", err) - } - end, err := reader.Seek(0, io.SeekEnd) - if err != nil { - return 0, fmt.Errorf("failed to seek to end: %w", err) + return 0, err } - size := end - currentPos - if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { - return 0, fmt.Errorf("failed to seek back: %w", err) + if len(plaintext) == 0 { + return 0, nil } + size := int64(len(plaintext)) - if size == 0 { - return 0, nil + byteOffset := int64(chunkNumber) * w.chunkSize + cipherStream, err := w.session.NewCipherAtOffset(byteOffset) + if err != nil { + return 0, err } + cipherStream.XORKeyStream(plaintext, plaintext) + encrypted := plaintext + + w.submitForHashing(chunkNumber, encrypted) + encReader := bytes.NewReader(encrypted) var etag string err = w.f.pacer.Call(func() (bool, error) { - // Seek back to start for retries - if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { + if _, err := encReader.Seek(0, io.SeekStart); err != nil { return false, err } var uploadErr error - etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, reader, size) + etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, encReader, size) return w.f.shouldRetry(ctx, uploadErr) }) if err != nil { return 0, err } + w.recordCompletedPart(chunkNumber, etag) + return size, nil +} + +// recordCompletedPart appends a completed part to the list (thread-safe). +func (w *internxtChunkWriter) recordCompletedPart(chunkNumber int, etag string) { w.partsMu.Lock() w.completedParts = append(w.completedParts, buckets.CompletedPart{ PartNumber: chunkNumber + 1, ETag: etag, }) w.partsMu.Unlock() +} - return size, nil +// submitForHashing feeds encrypted chunk data into the session's hash in order. +func (w *internxtChunkWriter) submitForHashing(chunkNumber int, encrypted []byte) { + w.hashMu.Lock() + defer w.hashMu.Unlock() + + if chunkNumber == w.nextHashChunk { + w.session.HashEncryptedData(encrypted) + w.nextHashChunk++ + for { + next, ok := w.pendingChunks[w.nextHashChunk] + if !ok { + break + } + w.session.HashEncryptedData(next) + delete(w.pendingChunks, w.nextHashChunk) + w.nextHashChunk++ + } + } else { + buf := make([]byte, len(encrypted)) + copy(buf, encrypted) + w.pendingChunks[chunkNumber] = buf + } } // Close completes the multipart upload and registers the file in Internxt Drive. func (w *internxtChunkWriter) Close(ctx context.Context) error { + w.hashMu.Lock() + pending := len(w.pendingChunks) + w.hashMu.Unlock() + if pending != 0 { + return fmt.Errorf("internal error: %d chunks still pending hash", pending) + } + // Sort parts by part number w.partsMu.Lock() sort.Slice(w.completedParts, func(i, j int) bool { diff --git a/docs/content/internxt.md b/docs/content/internxt.md index 0bd25dfd15f99..57a1c1014b19e 100644 --- a/docs/content/internxt.md +++ b/docs/content/internxt.md @@ -161,36 +161,6 @@ Properties: - Type: bool - Default: true -#### --internxt-chunk-size - -Chunk size for multipart uploads. - -Large files will be uploaded in chunks of this size. - -Memory usage is approximately chunk_size * upload_concurrency. - -Properties: - -- Config: chunk_size -- Env Var: RCLONE_INTERNXT_CHUNK_SIZE -- Type: SizeSuffix -- Default: 30Mi - -#### --internxt-upload-concurrency - -Concurrency for multipart uploads. - -This is the number of chunks of the same file that are uploaded concurrently. - -Note that each chunk is buffered in memory. - -Properties: - -- Config: upload_concurrency -- Env Var: RCLONE_INTERNXT_UPLOAD_CONCURRENCY -- Type: int -- Default: 4 - #### --internxt-encoding The encoding for the backend. diff --git a/fs/features.go b/fs/features.go index 1563dc459cd7a..a5df778cb6f17 100644 --- a/fs/features.go +++ b/fs/features.go @@ -739,6 +739,7 @@ type ChunkWriterInfo struct { ChunkSize int64 // preferred chunk size Concurrency int // how many chunks to write at once LeavePartsOnError bool // if set don't delete parts uploaded so far on error + MinFileSize int64 // minimum file size for multipart uploads, 0 means no minimum } // OpenChunkWriter is an option interface for Fs to implement chunked writing diff --git a/fs/operations/multithread_test.go b/fs/operations/multithread_test.go index d3a07ae7153ca..492d80cd7629c 100644 --- a/fs/operations/multithread_test.go +++ b/fs/operations/multithread_test.go @@ -112,8 +112,8 @@ func TestMultithreadCalculateNumChunks(t *testing.T) { } } -// Skip if not multithread, returning the chunkSize otherwise -func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int { +// Skip if not multithread, returning the chunkSize and minFileSize otherwise +func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) (chunkSize int, minFileSize int64) { features := r.Fremote.Features() if features.OpenChunkWriter == nil && features.OpenWriterAt == nil { t.Skip("multithread writing not supported") @@ -128,7 +128,7 @@ func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int } ci := fs.GetConfig(ctx) - chunkSize := int(ci.MultiThreadChunkSize) + chunkSize = int(ci.MultiThreadChunkSize) if features.OpenChunkWriter != nil { //OpenChunkWriter func(ctx context.Context, remote string, src ObjectInfo, options ...OpenOption) (info ChunkWriterInfo, writer ChunkWriter, err error) const fileName = "chunksize-probe" @@ -136,16 +136,17 @@ func skipIfNotMultithread(ctx context.Context, t *testing.T, r *fstest.Run) int info, writer, err := features.OpenChunkWriter(ctx, fileName, src) require.NoError(t, err) chunkSize = int(info.ChunkSize) + minFileSize = info.MinFileSize err = writer.Abort(ctx) require.NoError(t, err) } - return chunkSize + return chunkSize, minFileSize } func TestMultithreadCopy(t *testing.T) { r := fstest.NewRun(t) ctx := context.Background() - chunkSize := skipIfNotMultithread(ctx, t, r) + chunkSize, minFileSize := skipIfNotMultithread(ctx, t, r) // Check every other transfer for metadata checkMetadata := false ctx, ci := fs.AddConfig(ctx) @@ -163,6 +164,9 @@ func TestMultithreadCopy(t *testing.T) { ci.Metadata = checkMetadata fileName := fmt.Sprintf("test-multithread-copy-%v-%d-%d", upload, test.size, test.streams) t.Run(fmt.Sprintf("upload=%v,size=%v,streams=%v", upload, test.size, test.streams), func(t *testing.T) { + if minFileSize > 0 && int64(test.size) < minFileSize { + t.Skipf("file size %d is below backend minimum %d for multipart uploads", test.size, minFileSize) + } if *fstest.SizeLimit > 0 && int64(test.size) > *fstest.SizeLimit { t.Skipf("exceeded file size limit %d > %d", test.size, *fstest.SizeLimit) } @@ -290,9 +294,12 @@ func (rc wgReadCloser) Close() (err error) { func TestMultithreadCopyAbort(t *testing.T) { r := fstest.NewRun(t) ctx := context.Background() - chunkSize := skipIfNotMultithread(ctx, t, r) + chunkSize, minFileSize := skipIfNotMultithread(ctx, t, r) size := 2*chunkSize + 1 + if minFileSize > 0 && int64(size) < minFileSize { + t.Skipf("file size %d is below backend minimum %d for multipart uploads", size, minFileSize) + } if *fstest.SizeLimit > 0 && int64(size) > *fstest.SizeLimit { t.Skipf("exceeded file size limit %d > %d", size, *fstest.SizeLimit) } diff --git a/fstest/fstests/fstests.go b/fstest/fstests/fstests.go index 4db6f29070bba..32bbc6d1472a5 100644 --- a/fstest/fstests/fstests.go +++ b/fstest/fstests/fstests.go @@ -820,19 +820,23 @@ func Run(t *testing.T, opt *Opt) { t.Skip("FS has no OpenChunkWriter interface") } size5MBs := 5 * 1024 * 1024 - contents1 := random.String(size5MBs) - contents2 := random.String(size5MBs) - size1MB := 1 * 1024 * 1024 - contents3 := random.String(size1MB) + totalSize := int64(size5MBs*2 + size1MB) path := "writer-at-subdir/writer-at-file" - objSrc := object.NewStaticObjectInfo(path+"-WRONG-REMOTE", file1.ModTime, -1, true, nil, nil) - _, out, err := openChunkWriter(ctx, path, objSrc, &fs.ChunkOption{ + objSrc := object.NewStaticObjectInfo(path+"-WRONG-REMOTE", file1.ModTime, totalSize, true, nil, nil) + info, out, err := openChunkWriter(ctx, path, objSrc, &fs.ChunkOption{ ChunkSize: int64(size5MBs), }) + if info.MinFileSize > 0 && totalSize < info.MinFileSize { + t.Skipf("file size %d is below backend minimum %d for multipart uploads", totalSize, info.MinFileSize) + } require.NoError(t, err) + contents1 := random.String(size5MBs) + contents2 := random.String(size5MBs) + contents3 := random.String(size1MB) + var n int64 n, err = out.WriteChunk(ctx, 1, strings.NewReader(contents2)) assert.NoError(t, err) diff --git a/go.mod b/go.mod index fb6d6c5703d21..5f96dc777ffea 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/golang-jwt/jwt/v5 v5.3.1 github.com/google/uuid v1.6.0 github.com/hanwen/go-fuse/v2 v2.9.0 - github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d + github.com/internxt/rclone-adapter v0.0.0-20260316170255-0cc0b8f65dee github.com/jcmturner/gokrb5/v8 v8.4.4 github.com/jlaffaye/ftp v0.2.1-0.20240918233326-1b970516f5d3 github.com/josephspurrier/goversioninfo v1.5.0 diff --git a/go.sum b/go.sum index 6351ca7515d8e..8c6db92dfbdb0 100644 --- a/go.sum +++ b/go.sum @@ -429,6 +429,8 @@ github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9 h1:lxVac1u github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d h1:dMd7BPNXvz73BBwYSvqInndDkW+q8OAUu8+zSClYtEs= github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= +github.com/internxt/rclone-adapter v0.0.0-20260316170255-0cc0b8f65dee h1:Crt8J2oP3i6x5z6wkdS8jKTZ7lLhE3nhLNnq676rBvg= +github.com/internxt/rclone-adapter v0.0.0-20260316170255-0cc0b8f65dee/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= From 3de35a425872862db1f76c4b4a8b6bf8c3757519 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 19 Mar 2026 02:34:49 +0000 Subject: [PATCH 3/3] build(deps): bump google.golang.org/grpc from 1.79.1 to 1.79.3 Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.79.1 to 1.79.3. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.79.1...v1.79.3) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-version: 1.79.3 dependency-type: indirect ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 5f96dc777ffea..0985292f6d95b 100644 --- a/go.mod +++ b/go.mod @@ -263,7 +263,7 @@ require ( golang.org/x/image v0.36.0 // indirect golang.org/x/tools v0.42.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d // indirect - google.golang.org/grpc v1.79.1 // indirect + google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect moul.io/http2curl/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index 8c6db92dfbdb0..84581429c3435 100644 --- a/go.sum +++ b/go.sum @@ -423,12 +423,6 @@ github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyf github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= -github.com/internxt/rclone-adapter v0.0.0-20260220172730-613f4cc8b8fd h1:dSIuz2mpJAPQfhHYtG57D0qwSkgC/vQ69gHfeyQ4kxA= -github.com/internxt/rclone-adapter v0.0.0-20260220172730-613f4cc8b8fd/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= -github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9 h1:lxVac1uYH2qmAFlCeBZjlueL72PlrOEOoc5K/tbCRr8= -github.com/internxt/rclone-adapter v0.0.0-20260226152911-72b83123f8e9/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= -github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d h1:dMd7BPNXvz73BBwYSvqInndDkW+q8OAUu8+zSClYtEs= -github.com/internxt/rclone-adapter v0.0.0-20260303161852-4b337f11972d/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= github.com/internxt/rclone-adapter v0.0.0-20260316170255-0cc0b8f65dee h1:Crt8J2oP3i6x5z6wkdS8jKTZ7lLhE3nhLNnq676rBvg= github.com/internxt/rclone-adapter v0.0.0-20260316170255-0cc0b8f65dee/go.mod h1:vdPya4AIcDjvng4ViaAzqjegJf0VHYpYHQguFx5xBp0= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -861,8 +855,6 @@ golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1103,8 +1095,8 @@ google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKa google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= +google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=