From 7417f1a0d94e0153a784df526ae16bed28400c0e Mon Sep 17 00:00:00 2001 From: Rex P Date: Wed, 20 May 2026 14:44:11 +1000 Subject: [PATCH 1/3] Remove unnecessary test --- .../stages/processing/processing_test.go | 61 ------------------- gcp/indexer/storage/storage_test.go | 1 + 2 files changed, 1 insertion(+), 61 deletions(-) delete mode 100644 gcp/indexer/stages/processing/processing_test.go diff --git a/gcp/indexer/stages/processing/processing_test.go b/gcp/indexer/stages/processing/processing_test.go deleted file mode 100644 index 54f20df3c83..00000000000 --- a/gcp/indexer/stages/processing/processing_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package processing - -import ( - "reflect" - "testing" -) - -func Test_processBuckets(t *testing.T) { - type args struct { - fileResults []*FileResult - } - tests := []struct { - name string - args args - want map[int]*BucketNode - }{ - { - name: "Test bucket", - args: args{ - fileResults: []*FileResult{ - { - Path: "abc", - Hash: []byte{0, 1, 2, 3, 4, 5, 6}, - }, - { - Path: "efg", - Hash: []byte{7, 4, 1, 3, 4, 5, 6}, - }, - { - Path: "hji", - Hash: []byte{1, 9, 1, 3, 4, 5, 6}, - }, - }, - }, - want: map[int]*BucketNode{ - 1: { - NodeHash: []byte{154, 164, 97, 225, 236, 164, 8, 111, 146, 48, 170, 73, 201, 11, 12, 97}, - FilesContained: 1, - }, - 260: { - NodeHash: []byte{216, 219, 93, 48, 21, 44, 152, 195, 127, 147, 177, 201, 84, 210, 171, 150}, - FilesContained: 1, - }, - 265: { - NodeHash: []byte{8, 158, 190, 9, 14, 126, 134, 10, 210, 118, 69, 57, 158, 64, 170, 161}, - FilesContained: 1, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, _ := processBuckets(tt.args.fileResults) - for key, value := range tt.want { - if !reflect.DeepEqual(got[key], value) { - t.Errorf("processBuckets() got = %v: %v, want %v", key, got, value) - } - } - }) - } -} diff --git a/gcp/indexer/storage/storage_test.go b/gcp/indexer/storage/storage_test.go index 3e73962ae09..61a9df616c3 100644 --- a/gcp/indexer/storage/storage_test.go +++ b/gcp/indexer/storage/storage_test.go @@ -35,6 +35,7 @@ func getDoc(t *testing.T, pages int) *document { Name: "abc", Commit: []byte{0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, FileHashType: "MD5", + DocumentVersion: 2, } } From 4d69df300fff3660517475cf7d9453e6a7d3ea21 Mon Sep 17 00:00:00 2001 From: Rex P Date: Wed, 20 May 2026 14:44:21 +1000 Subject: [PATCH 2/3] Update pubsub version --- gcp/indexer/go.mod | 4 ++-- gcp/indexer/go.sum | 4 ---- gcp/indexer/indexer.go | 9 +++++---- gcp/indexer/stages/preparation/preparation.go | 4 ++-- gcp/indexer/stages/processing/processing.go | 4 ++-- 5 files changed, 11 insertions(+), 14 deletions(-) diff --git a/gcp/indexer/go.mod b/gcp/indexer/go.mod index 2430d8dcf35..a19cf66afff 100644 --- a/gcp/indexer/go.mod +++ b/gcp/indexer/go.mod @@ -4,7 +4,7 @@ go 1.26.3 require ( cloud.google.com/go/datastore v1.23.0 - cloud.google.com/go/pubsub v1.50.2 + cloud.google.com/go/pubsub/v2 v2.4.0 cloud.google.com/go/storage v1.62.1 github.com/go-git/go-git/v5 v5.19.0 github.com/golang/glog v1.2.5 @@ -22,7 +22,6 @@ require ( cloud.google.com/go/compute/metadata v0.9.0 // indirect cloud.google.com/go/iam v1.7.0 // indirect cloud.google.com/go/monitoring v1.24.3 // indirect - cloud.google.com/go/pubsub/v2 v2.4.0 // indirect dario.cat/mergo v1.0.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect @@ -56,6 +55,7 @@ require ( github.com/skeema/knownhosts v1.3.1 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect github.com/xanzy/ssh-agent v0.3.3 // indirect + go.einride.tech/aip v0.83.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.39.0 // indirect diff --git a/gcp/indexer/go.sum b/gcp/indexer/go.sum index f0718e065c2..b2bcc93fedd 100644 --- a/gcp/indexer/go.sum +++ b/gcp/indexer/go.sum @@ -13,16 +13,12 @@ cloud.google.com/go/datastore v1.23.0 h1:mAlWN3tnQe1OqVM3UtYBIbWTz9aU83RgW4hXOrf cloud.google.com/go/datastore v1.23.0/go.mod h1:bOvQQekv4VACRJmH/MBy12MT6M3udfTuCyxw+tzY+8s= cloud.google.com/go/iam v1.7.0 h1:JD3zh0C6LHl16aCn5Akff0+GELdp1+4hmh6ndoFLl8U= cloud.google.com/go/iam v1.7.0/go.mod h1:tetWZW1PD/m6vcuY2Zj/aU0eCHNPuxedbnbRTyKXvdY= -cloud.google.com/go/kms v1.26.0 h1:cK9mN2cf+9V63D3H1f6koxTatWy39aTI/hCjz1I+adU= -cloud.google.com/go/kms v1.26.0/go.mod h1:pHKOdFJm63hxBsiPkYtowZPltu9dW0MWvBa6IA4HM58= cloud.google.com/go/logging v1.13.2 h1:qqlHCBvieJT9Cdq4QqYx1KPadCQ2noD4FK02eNqHAjA= cloud.google.com/go/logging v1.13.2/go.mod h1:zaybliM3yun1J8mU2dVQ1/qDzjbOqEijZCn6hSBtKak= cloud.google.com/go/longrunning v0.9.0 h1:0EzbDEGsAvOZNbqXopgniY0w0a1phvu5IdUFq8grmqY= cloud.google.com/go/longrunning v0.9.0/go.mod h1:pkTz846W7bF4o2SzdWJ40Hu0Re+UoNT6Q5t+igIcb8E= cloud.google.com/go/monitoring v1.24.3 h1:dde+gMNc0UhPZD1Azu6at2e79bfdztVDS5lvhOdsgaE= cloud.google.com/go/monitoring v1.24.3/go.mod h1:nYP6W0tm3N9H/bOw8am7t62YTzZY+zUeQ+Bi6+2eonI= -cloud.google.com/go/pubsub v1.50.2 h1:54Up97HnThdP4H8jjWJSSQ/mnYG2EKon7ZSNETRq0tM= -cloud.google.com/go/pubsub v1.50.2/go.mod h1:jyCWeZdGFqd4mitSsBERnJcpqaHBsxQoPkNvjj4sp0w= cloud.google.com/go/pubsub/v2 v2.4.0 h1:oMKNiBQpXImRWnHYla9uSU66ZzByZwBSCJOEs/pTKVg= cloud.google.com/go/pubsub/v2 v2.4.0/go.mod h1:2lS/XQKq5qtOMs6kHBK+WX1ytUC36kLl2ig3zqsGUx8= cloud.google.com/go/storage v1.62.1 h1:Os0G3XbUbjZumkpDUf2Y0rLoXJTCF1kU2kWUujKYXD8= diff --git a/gcp/indexer/indexer.go b/gcp/indexer/indexer.go index 88af1c2239d..1bc71857364 100644 --- a/gcp/indexer/indexer.go +++ b/gcp/indexer/indexer.go @@ -20,7 +20,7 @@ import ( "flag" "fmt" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "cloud.google.com/go/storage" "github.com/google/osv.dev/gcp/indexer/config" "github.com/google/osv.dev/gcp/indexer/stages/preparation" @@ -66,7 +66,8 @@ func main() { defer storer.Close() if *worker { - if err := runWorker(ctx, storer, repoBucketHdl, psCl.Subscription(*subName), *subMessages); err != nil { + sub := psCl.Subscriber(fmt.Sprintf("projects/%s/subscriptions/%s", *projectID, *subName)) + if err := runWorker(ctx, storer, repoBucketHdl, sub, *subMessages); err != nil { log.Exitf("failed to run worker: %v", err) } return @@ -77,7 +78,7 @@ func main() { } } -func runWorker(ctx context.Context, storer *idxStorage.Store, repoBucketHdl *storage.BucketHandle, sub *pubsub.Subscription, outstanding int) error { +func runWorker(ctx context.Context, storer *idxStorage.Store, repoBucketHdl *storage.BucketHandle, sub *pubsub.Subscriber, outstanding int) error { procStage := processing.Stage{ Storer: storer, RepoHdl: repoBucketHdl, @@ -98,7 +99,7 @@ func runController(ctx context.Context, storer *idxStorage.Store, repoBucketHdl, return fmt.Errorf("failed to load configurations: %v", err) } - topic := psCl.Topic(*pubsubTopic) + topic := psCl.Publisher(fmt.Sprintf("projects/%s/topics/%s", *projectID, *pubsubTopic)) defer topic.Stop() prepStage := &preparation.Stage{ diff --git a/gcp/indexer/stages/preparation/preparation.go b/gcp/indexer/stages/preparation/preparation.go index 591459d2330..0e67afbfb1c 100644 --- a/gcp/indexer/stages/preparation/preparation.go +++ b/gcp/indexer/stages/preparation/preparation.go @@ -27,7 +27,7 @@ import ( "strings" "time" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "cloud.google.com/go/storage" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" @@ -66,7 +66,7 @@ type Checker interface { type Stage struct { Checker Checker RepoHdl *storage.BucketHandle - Output *pubsub.Topic + Output *pubsub.Publisher } // Run runs the stage and outputs Result data types to the results channel. diff --git a/gcp/indexer/stages/processing/processing.go b/gcp/indexer/stages/processing/processing.go index 2d0339537a6..b739891df21 100644 --- a/gcp/indexer/stages/processing/processing.go +++ b/gcp/indexer/stages/processing/processing.go @@ -30,7 +30,7 @@ import ( "sort" "strings" - "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsub/v2" "cloud.google.com/go/storage" "github.com/go-git/go-git/v5" "github.com/google/osv.dev/gcp/indexer/shared" @@ -64,7 +64,7 @@ type BucketNode struct { type Stage struct { Storer Storer RepoHdl *storage.BucketHandle - Input *pubsub.Subscription + Input *pubsub.Subscriber PubSubOutstandingMessages int } From 2017ad674fd9196bd656180a9a2fbbf2c103ad6f Mon Sep 17 00:00:00 2001 From: Rex P Date: Wed, 20 May 2026 15:39:03 +1000 Subject: [PATCH 3/3] Fix lint issues --- gcp/indexer/storage/storage_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcp/indexer/storage/storage_test.go b/gcp/indexer/storage/storage_test.go index 61a9df616c3..cc84c20c812 100644 --- a/gcp/indexer/storage/storage_test.go +++ b/gcp/indexer/storage/storage_test.go @@ -32,9 +32,9 @@ func getRepoInfo(t *testing.T) *preparation.Result { func getDoc(t *testing.T, pages int) *document { return &document{ - Name: "abc", - Commit: []byte{0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, - FileHashType: "MD5", + Name: "abc", + Commit: []byte{0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, + FileHashType: "MD5", DocumentVersion: 2, } }