Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions gcp/indexer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.26.3

require (
cloud.google.com/go/datastore v1.23.0
cloud.google.com/go/pubsub v1.50.2
cloud.google.com/go/pubsub/v2 v2.4.0
cloud.google.com/go/storage v1.62.1
github.com/go-git/go-git/v5 v5.19.0
github.com/golang/glog v1.2.5
Expand All @@ -22,7 +22,6 @@ require (
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.7.0 // indirect
cloud.google.com/go/monitoring v1.24.3 // indirect
cloud.google.com/go/pubsub/v2 v2.4.0 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.31.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect
Expand Down Expand Up @@ -56,6 +55,7 @@ require (
github.com/skeema/knownhosts v1.3.1 // indirect
github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
go.einride.tech/aip v0.83.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.39.0 // indirect
Expand Down
4 changes: 0 additions & 4 deletions gcp/indexer/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@ cloud.google.com/go/datastore v1.23.0 h1:mAlWN3tnQe1OqVM3UtYBIbWTz9aU83RgW4hXOrf
cloud.google.com/go/datastore v1.23.0/go.mod h1:bOvQQekv4VACRJmH/MBy12MT6M3udfTuCyxw+tzY+8s=
cloud.google.com/go/iam v1.7.0 h1:JD3zh0C6LHl16aCn5Akff0+GELdp1+4hmh6ndoFLl8U=
cloud.google.com/go/iam v1.7.0/go.mod h1:tetWZW1PD/m6vcuY2Zj/aU0eCHNPuxedbnbRTyKXvdY=
cloud.google.com/go/kms v1.26.0 h1:cK9mN2cf+9V63D3H1f6koxTatWy39aTI/hCjz1I+adU=
cloud.google.com/go/kms v1.26.0/go.mod h1:pHKOdFJm63hxBsiPkYtowZPltu9dW0MWvBa6IA4HM58=
cloud.google.com/go/logging v1.13.2 h1:qqlHCBvieJT9Cdq4QqYx1KPadCQ2noD4FK02eNqHAjA=
cloud.google.com/go/logging v1.13.2/go.mod h1:zaybliM3yun1J8mU2dVQ1/qDzjbOqEijZCn6hSBtKak=
cloud.google.com/go/longrunning v0.9.0 h1:0EzbDEGsAvOZNbqXopgniY0w0a1phvu5IdUFq8grmqY=
cloud.google.com/go/longrunning v0.9.0/go.mod h1:pkTz846W7bF4o2SzdWJ40Hu0Re+UoNT6Q5t+igIcb8E=
cloud.google.com/go/monitoring v1.24.3 h1:dde+gMNc0UhPZD1Azu6at2e79bfdztVDS5lvhOdsgaE=
cloud.google.com/go/monitoring v1.24.3/go.mod h1:nYP6W0tm3N9H/bOw8am7t62YTzZY+zUeQ+Bi6+2eonI=
cloud.google.com/go/pubsub v1.50.2 h1:54Up97HnThdP4H8jjWJSSQ/mnYG2EKon7ZSNETRq0tM=
cloud.google.com/go/pubsub v1.50.2/go.mod h1:jyCWeZdGFqd4mitSsBERnJcpqaHBsxQoPkNvjj4sp0w=
cloud.google.com/go/pubsub/v2 v2.4.0 h1:oMKNiBQpXImRWnHYla9uSU66ZzByZwBSCJOEs/pTKVg=
cloud.google.com/go/pubsub/v2 v2.4.0/go.mod h1:2lS/XQKq5qtOMs6kHBK+WX1ytUC36kLl2ig3zqsGUx8=
cloud.google.com/go/storage v1.62.1 h1:Os0G3XbUbjZumkpDUf2Y0rLoXJTCF1kU2kWUujKYXD8=
Expand Down
9 changes: 5 additions & 4 deletions gcp/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"flag"
"fmt"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/storage"
"github.com/google/osv.dev/gcp/indexer/config"
"github.com/google/osv.dev/gcp/indexer/stages/preparation"
Expand Down Expand Up @@ -66,7 +66,8 @@ func main() {
defer storer.Close()

if *worker {
if err := runWorker(ctx, storer, repoBucketHdl, psCl.Subscription(*subName), *subMessages); err != nil {
sub := psCl.Subscriber(fmt.Sprintf("projects/%s/subscriptions/%s", *projectID, *subName))
if err := runWorker(ctx, storer, repoBucketHdl, sub, *subMessages); err != nil {
log.Exitf("failed to run worker: %v", err)
}
return
Expand All @@ -77,7 +78,7 @@ func main() {
}
}

func runWorker(ctx context.Context, storer *idxStorage.Store, repoBucketHdl *storage.BucketHandle, sub *pubsub.Subscription, outstanding int) error {
func runWorker(ctx context.Context, storer *idxStorage.Store, repoBucketHdl *storage.BucketHandle, sub *pubsub.Subscriber, outstanding int) error {
procStage := processing.Stage{
Storer: storer,
RepoHdl: repoBucketHdl,
Expand All @@ -98,7 +99,7 @@ func runController(ctx context.Context, storer *idxStorage.Store, repoBucketHdl,
return fmt.Errorf("failed to load configurations: %v", err)
}

topic := psCl.Topic(*pubsubTopic)
topic := psCl.Publisher(fmt.Sprintf("projects/%s/topics/%s", *projectID, *pubsubTopic))
defer topic.Stop()

prepStage := &preparation.Stage{
Expand Down
4 changes: 2 additions & 2 deletions gcp/indexer/stages/preparation/preparation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"strings"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/storage"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
Expand Down Expand Up @@ -66,7 +66,7 @@ type Checker interface {
type Stage struct {
Checker Checker
RepoHdl *storage.BucketHandle
Output *pubsub.Topic
Output *pubsub.Publisher
}

// Run runs the stage and outputs Result data types to the results channel.
Expand Down
4 changes: 2 additions & 2 deletions gcp/indexer/stages/processing/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"sort"
"strings"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/storage"
"github.com/go-git/go-git/v5"
"github.com/google/osv.dev/gcp/indexer/shared"
Expand Down Expand Up @@ -64,7 +64,7 @@ type BucketNode struct {
type Stage struct {
Storer Storer
RepoHdl *storage.BucketHandle
Input *pubsub.Subscription
Input *pubsub.Subscriber
PubSubOutstandingMessages int
}

Expand Down
61 changes: 0 additions & 61 deletions gcp/indexer/stages/processing/processing_test.go

This file was deleted.

7 changes: 4 additions & 3 deletions gcp/indexer/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ func getRepoInfo(t *testing.T) *preparation.Result {

func getDoc(t *testing.T, pages int) *document {
return &document{
Name: "abc",
Commit: []byte{0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
FileHashType: "MD5",
Name: "abc",
Commit: []byte{0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00},
FileHashType: "MD5",
DocumentVersion: 2,
}
}

Expand Down
Loading