Skip to content

Commit 2e0dc27

Browse files
Merge branch 'main' into fix/ins-191
2 parents 815ac8b + 8c1219a commit 2e0dc27

File tree

6 files changed

+269
-10
lines changed

6 files changed

+269
-10
lines changed

go.mod

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ require (
106106
go.uber.org/automaxprocs v1.6.0
107107
go.uber.org/mock v0.5.2
108108
go.uber.org/zap v1.27.0
109-
golang.org/x/crypto v0.43.0
110-
golang.org/x/net v0.45.0
109+
golang.org/x/crypto v0.45.0
110+
golang.org/x/net v0.47.0
111111
golang.org/x/oauth2 v0.30.0
112-
golang.org/x/sync v0.17.0
113-
golang.org/x/text v0.30.0
112+
golang.org/x/sync v0.18.0
113+
golang.org/x/text v0.31.0
114114
golang.org/x/time v0.12.0
115115
google.golang.org/api v0.247.0
116116
google.golang.org/protobuf v1.36.9
@@ -316,9 +316,9 @@ require (
316316
go.uber.org/multierr v1.11.0 // indirect
317317
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
318318
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
319-
golang.org/x/mod v0.28.0 // indirect
320-
golang.org/x/sys v0.37.0 // indirect
321-
golang.org/x/term v0.36.0 // indirect
319+
golang.org/x/mod v0.29.0 // indirect
320+
golang.org/x/sys v0.38.0 // indirect
321+
golang.org/x/term v0.37.0 // indirect
322322
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
323323
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
324324
google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect

go.sum

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y
854854
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
855855
golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04=
856856
golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0=
857+
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
858+
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
857859
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
858860
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
859861
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -885,6 +887,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
885887
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
886888
golang.org/x/mod v0.28.0 h1:gQBtGhjxykdjY9YhZpSlZIsbnaE2+PgjfLWUQTnoZ1U=
887889
golang.org/x/mod v0.28.0/go.mod h1:yfB/L0NOf/kmEbXjzCPOx1iK1fRutOydrCMsqRhEBxI=
890+
golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA=
891+
golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w=
888892
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
889893
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
890894
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@@ -908,6 +912,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
908912
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
909913
golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM=
910914
golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY=
915+
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
916+
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
911917
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
912918
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
913919
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -925,6 +931,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
925931
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
926932
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
927933
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
934+
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
935+
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
928936
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
929937
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
930938
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -968,11 +976,15 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
968976
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
969977
golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ=
970978
golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
979+
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
980+
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
971981
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
972982
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
973983
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
974984
golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q=
975985
golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss=
986+
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
987+
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
976988
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
977989
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
978990
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -984,6 +996,8 @@ golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
984996
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
985997
golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k=
986998
golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM=
999+
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
1000+
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
9871001
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
9881002
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
9891003
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=

pkg/sources/s3/checkpointer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ func (p *Checkpointer) Reset() {
9898
type ResumeInfo struct {
9999
CurrentBucket string `json:"current_bucket"` // Current bucket being scanned
100100
StartAfter string `json:"start_after"` // Last processed object key
101+
Role string `json:"role"` // Role used for scanning
101102
}
102103

103104
// ResumePoint retrieves the last saved checkpoint state if one exists.
@@ -121,7 +122,7 @@ func (p *Checkpointer) ResumePoint(ctx context.Context) (ResumeInfo, error) {
121122
return resume, nil
122123
}
123124

124-
return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter}, nil
125+
return ResumeInfo{CurrentBucket: resumeInfo.CurrentBucket, StartAfter: resumeInfo.StartAfter, Role: resumeInfo.Role}, nil
125126
}
126127

127128
// Complete marks the entire scanning operation as finished and clears the resume state.
@@ -215,7 +216,7 @@ func (p *Checkpointer) updateCheckpoint(bucket string, role string, lastKey stri
215216
return nil
216217
}
217218

218-
encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey})
219+
encoded, err := json.Marshal(&ResumeInfo{CurrentBucket: bucket, StartAfter: lastKey, Role: role})
219220
if err != nil {
220221
return fmt.Errorf("failed to encode resume info: %w", err)
221222
}

pkg/sources/s3/checkpointer_test.go

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,56 @@ func TestCheckpointerResumption(t *testing.T) {
6161
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
6262
}
6363

64+
func TestCheckpointerResumptionWithRole(t *testing.T) {
65+
ctx := context.Background()
66+
67+
// First scan - process 6 objects then interrupt.
68+
initialProgress := &sources.Progress{}
69+
tracker := NewCheckpointer(ctx, initialProgress)
70+
role := "test-role"
71+
72+
firstPage := &s3.ListObjectsV2Output{
73+
Contents: make([]s3types.Object, 12), // Total of 12 objects
74+
}
75+
for i := range 12 {
76+
key := fmt.Sprintf("key-%d", i)
77+
firstPage.Contents[i] = s3types.Object{Key: &key}
78+
}
79+
80+
// Process first 6 objects.
81+
for i := range 6 {
82+
err := tracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, firstPage.Contents)
83+
assert.NoError(t, err)
84+
}
85+
86+
// Verify resume info is set correctly.
87+
resumeInfo, err := tracker.ResumePoint(ctx)
88+
require.NoError(t, err)
89+
assert.Equal(t, "test-bucket", resumeInfo.CurrentBucket)
90+
assert.Equal(t, "key-5", resumeInfo.StartAfter)
91+
assert.Equal(t, role, resumeInfo.Role)
92+
93+
// Resume scan with existing progress.
94+
resumeTracker := NewCheckpointer(ctx, initialProgress)
95+
96+
resumePage := &s3.ListObjectsV2Output{
97+
Contents: firstPage.Contents[6:], // Remaining 6 objects
98+
}
99+
100+
// Process remaining objects.
101+
for i := range len(resumePage.Contents) {
102+
err := resumeTracker.UpdateObjectCompletion(ctx, i, "test-bucket", role, resumePage.Contents)
103+
assert.NoError(t, err)
104+
}
105+
106+
// Verify final resume info.
107+
finalResumeInfo, err := resumeTracker.ResumePoint(ctx)
108+
require.NoError(t, err)
109+
assert.Equal(t, "test-bucket", finalResumeInfo.CurrentBucket)
110+
assert.Equal(t, "key-11", finalResumeInfo.StartAfter)
111+
assert.Equal(t, role, finalResumeInfo.Role)
112+
}
113+
64114
func TestCheckpointerReset(t *testing.T) {
65115
tests := []struct {
66116
name string
@@ -111,6 +161,13 @@ func TestGetResumePoint(t *testing.T) {
111161
},
112162
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key"},
113163
},
164+
{
165+
name: "valid resume info with role",
166+
progress: &sources.Progress{
167+
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key","role":"test-role"}`,
168+
},
169+
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: "test-role"},
170+
},
114171
{
115172
name: "empty encoded resume info",
116173
progress: &sources.Progress{EncodedResumeInfo: ""},
@@ -121,6 +178,13 @@ func TestGetResumePoint(t *testing.T) {
121178
EncodedResumeInfo: `{"current_bucket":"","start_after":"test-key"}`,
122179
},
123180
},
181+
{
182+
name: "no role in resume info",
183+
progress: &sources.Progress{
184+
EncodedResumeInfo: `{"current_bucket":"test-bucket","start_after":"test-key"}`,
185+
},
186+
expectedResumeInfo: ResumeInfo{CurrentBucket: "test-bucket", StartAfter: "test-key", Role: ""},
187+
},
124188
{
125189
name: "unmarshal error",
126190
progress: &sources.Progress{
@@ -257,6 +321,122 @@ func TestCheckpointerUpdate(t *testing.T) {
257321
})
258322
}
259323
}
324+
func TestCheckpointerUpdateWithRole(t *testing.T) {
325+
role := "test-role"
326+
tests := []struct {
327+
name string
328+
description string
329+
completedIdx int
330+
pageSize int
331+
preCompleted []int
332+
expectedKey string
333+
expectedRole string
334+
expectedLowestIncomplete int
335+
}{
336+
{
337+
name: "first object completed",
338+
description: "Basic case - completing first object",
339+
completedIdx: 0,
340+
pageSize: 3,
341+
expectedKey: "key-0",
342+
expectedRole: role,
343+
expectedLowestIncomplete: 1,
344+
},
345+
{
346+
name: "completing missing middle",
347+
description: "Completing object when previous is done",
348+
completedIdx: 1,
349+
pageSize: 3,
350+
preCompleted: []int{0},
351+
expectedKey: "key-1",
352+
expectedRole: role,
353+
expectedLowestIncomplete: 2,
354+
},
355+
{
356+
name: "all objects completed in order",
357+
description: "Completing final object in sequence",
358+
completedIdx: 2,
359+
pageSize: 3,
360+
preCompleted: []int{0, 1},
361+
expectedKey: "key-2",
362+
expectedRole: role,
363+
expectedLowestIncomplete: 3,
364+
},
365+
{
366+
name: "out of order completion before lowest",
367+
description: "Completing object before current lowest incomplete - should not affect checkpoint",
368+
completedIdx: 1,
369+
pageSize: 4,
370+
preCompleted: []int{0, 2, 3},
371+
expectedKey: "key-3",
372+
expectedRole: role,
373+
expectedLowestIncomplete: 4,
374+
},
375+
{
376+
name: "last index in max page",
377+
description: "Edge case - maximum page size boundary",
378+
completedIdx: 999,
379+
pageSize: 1000,
380+
preCompleted: func() []int {
381+
indices := make([]int, 999)
382+
for i := range indices {
383+
indices[i] = i
384+
}
385+
return indices
386+
}(),
387+
expectedKey: "key-999",
388+
expectedRole: role,
389+
expectedLowestIncomplete: 1000,
390+
},
391+
}
392+
393+
for _, tt := range tests {
394+
t.Run(tt.name, func(t *testing.T) {
395+
t.Parallel()
396+
397+
ctx := context.Background()
398+
progress := new(sources.Progress)
399+
tracker := &Checkpointer{
400+
progress: progress,
401+
completedObjects: make([]bool, tt.pageSize),
402+
completionOrder: make([]int, 0, tt.pageSize),
403+
lowestIncompleteIdx: 0,
404+
}
405+
406+
page := &s3.ListObjectsV2Output{Contents: make([]s3types.Object, tt.pageSize)}
407+
for i := range tt.pageSize {
408+
key := fmt.Sprintf("key-%d", i)
409+
page.Contents[i] = s3types.Object{Key: &key}
410+
}
411+
412+
// Setup pre-completed objects.
413+
for _, idx := range tt.preCompleted {
414+
tracker.completedObjects[idx] = true
415+
tracker.completionOrder = append(tracker.completionOrder, idx)
416+
}
417+
418+
// Find the correct lowest incomplete index after pre-completion.
419+
for i := range tt.pageSize {
420+
if !tracker.completedObjects[i] {
421+
tracker.lowestIncompleteIdx = i
422+
break
423+
}
424+
}
425+
426+
err := tracker.UpdateObjectCompletion(ctx, tt.completedIdx, "test-bucket", role, page.Contents)
427+
assert.NoError(t, err, "Unexpected error updating progress")
428+
429+
var info ResumeInfo
430+
err = json.Unmarshal([]byte(progress.EncodedResumeInfo), &info)
431+
assert.NoError(t, err, "Failed to decode resume info")
432+
assert.Equal(t, tt.expectedKey, info.StartAfter, "Incorrect resume point")
433+
assert.Equal(t, tt.expectedRole, info.Role, "Incorrect role")
434+
435+
assert.Equal(t, tt.expectedLowestIncomplete, tracker.lowestIncompleteIdx,
436+
"Incorrect lowest incomplete index")
437+
})
438+
}
439+
}
260440

261441
func TestCheckpointerUpdateUnitScan(t *testing.T) {
262442
ctx := context.Background()

pkg/sources/s3/s3.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ type resumePosition struct {
245245
startAfter string // The last processed object key within the bucket
246246
isNewScan bool // True if we're starting a fresh scan
247247
exactMatch bool // True if we found the exact bucket we were previously processing
248+
role string // The role used during the previous scan
248249
}
249250

250251
// determineResumePosition calculates where to resume scanning from based on the last saved checkpoint
@@ -282,6 +283,7 @@ func determineResumePosition(ctx context.Context, tracker *Checkpointer, buckets
282283
startAfter: resumePoint.StartAfter,
283284
index: startIdx,
284285
exactMatch: found,
286+
role: resumePoint.Role,
285287
}
286288
}
287289

@@ -306,12 +308,14 @@ func (s *Source) scanBuckets(
306308
"Resume bucket no longer available, starting from closest position",
307309
"original_bucket", pos.bucket,
308310
"position", pos.index,
311+
"role", pos.role,
309312
)
310313
default:
311314
ctx.Logger().Info(
312315
"Resuming scan from previous scan's bucket",
313316
"bucket", pos.bucket,
314317
"position", pos.index,
318+
"role", pos.role,
315319
)
316320
}
317321

@@ -327,7 +331,7 @@ func (s *Source) scanBuckets(
327331
)
328332

329333
var startAfter *string
330-
if bucket == pos.bucket && pos.startAfter != "" {
334+
if bucket == pos.bucket && pos.startAfter != "" && role == pos.role {
331335
startAfter = &pos.startAfter
332336
ctx.Logger().V(3).Info(
333337
"Resuming bucket scan",

0 commit comments

Comments
 (0)