From 0fdab66fbe840c6fb56a47634183f2e01878a58e Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 26 Jun 2026 16:53:30 +0900 Subject: [PATCH] resolve parquet shard count from bucket index to reduce object storage calls Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/storegateway/parquet_bucket_store.go | 67 ++++++++++++++----- pkg/storegateway/parquet_bucket_stores.go | 19 +++--- .../parquet_bucket_stores_test.go | 66 ++++++++++++++++++ 4 files changed, 130 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 974a16d07a..e8f6f67bd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569 * [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570 * [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553 +* [ENHANCEMENT] Store Gateway: Resolve the parquet shard count from the bucket index instead of reading the converter mark for each block, reducing object storage calls when the bucket index is enabled. #7648 * [ENHANCEMENT] Query Frontend: Improve the slow query log with `source`, `user_agent`, `engine_type`, `block_store_type`, and query stats fields to aid slow query diagnosis. #7601 * [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626 * [ENHANCEMENT] Metrics: Add native histogram support to all remaining production histograms, enabling dual-format (classic + native) exposition across all Cortex components. #7636 diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go index 5a5d740bc4..d24124f700 100644 --- a/pkg/storegateway/parquet_bucket_store.go +++ b/pkg/storegateway/parquet_bucket_store.go @@ -27,16 +27,20 @@ import ( "google.golang.org/grpc/status" cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/parquetutil" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" ) type parquetBucketStore struct { - logger log.Logger - bucket objstore.InstrumentedBucket - limits *validation.Overrides - concurrency int + logger log.Logger + bucket objstore.InstrumentedBucket + indexBucket objstore.Bucket + limits *validation.Overrides + userID string + bucketIndexEnabled bool + concurrency int chunksDecoder *schema.PrometheusParquetChunksDecoder @@ -67,20 +71,15 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) - // Read converter marks and expand to per-shard (blockID, shardID) lists. - // TODO(Sungjin1212): Read the shard count from the bucket index instead of reading the converter mark for each block. + shardCounts, err := p.resolveShardCounts(ctx, blockIDs) + if err != nil { + return nil, err + } + var shardBlockIDs []string var shardIDs []int for _, blockID := range blockIDs { - uid, err := ulid.Parse(blockID) - if err != nil { - return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID) - } - marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger) - if err != nil { - return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) - } - numShards := marker.Shards + numShards := shardCounts[blockID] if numShards <= 0 { // backward compatibility: blocks without a shard count have one shard numShards = 1 @@ -112,6 +111,44 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher return parquetBlocks, nil } +// resolveShardCounts returns the number of parquet shards for each requested block ID. +// +// When the bucket index is enabled, the shard count is read from the bucket index. +// When the bucket index is disabled, it falls back to reading the converter mark +// for each block. +func (p *parquetBucketStore) resolveShardCounts(ctx context.Context, blockIDs []string) (map[string]int, error) { + shardCounts := make(map[string]int, len(blockIDs)) + + if p.bucketIndexEnabled { + idx, err := bucketindex.ReadIndex(ctx, p.indexBucket, p.userID, p.limits, p.logger) + if err != nil { + return nil, errors.Wrap(err, "failed to read bucket index") + } + for _, b := range idx.Blocks { + numShards := 1 + if b.Parquet != nil && b.Parquet.Shards > 0 { + numShards = b.Parquet.Shards + } + shardCounts[b.ID.String()] = numShards + } + return shardCounts, nil + } + + // Fallback: read the converter mark for each block. + for _, blockID := range blockIDs { + uid, err := ulid.Parse(blockID) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID) + } + marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger) + if err != nil { + return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID) + } + shardCounts[blockID] = marker.Shards + } + return shardCounts, nil +} + // Series implements the store interface for a single parquet bucket store func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { spanLog, ctx := spanlogger.New(seriesSrv.Context(), "ParquetBucketStore.Series") diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go index f0c488ba68..6310f22337 100644 --- a/pkg/storegateway/parquet_bucket_stores.go +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -270,14 +270,17 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) store := &parquetBucketStore{ - logger: userLogger, - bucket: userBucket, - limits: u.limits, - concurrency: u.cfg.BucketStore.ParquetQueryConcurrency, - chunksDecoder: u.chunksDecoder, - matcherCache: u.matcherCache, - parquetShardCache: u.parquetShardCache, - rowRangesCache: u.rowRangesCache, + logger: userLogger, + bucket: userBucket, + indexBucket: u.bucket, + limits: u.limits, + userID: userID, + bucketIndexEnabled: u.cfg.BucketStore.BucketIndex.Enabled, + concurrency: u.cfg.BucketStore.ParquetQueryConcurrency, + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + parquetShardCache: u.parquetShardCache, + rowRangesCache: u.rowRangesCache, } return store, nil diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go index d577de0655..797bd4c87c 100644 --- a/pkg/storegateway/parquet_bucket_stores_test.go +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -1,6 +1,7 @@ package storegateway import ( + "bytes" "context" "errors" "fmt" @@ -30,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/users" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -485,3 +487,67 @@ func TestParquetBucketStores_Series_MultiShard(t *testing.T) { require.NoError(t, err) assert.Equal(t, numSeries, len(series), "all series from all shards must be returned") } + +// TestParquetBucketStores_Series_MultiShard_BucketIndex verifies that, when the bucket +// index is enabled, the Store Gateway resolves the parquet shard count from the bucket +// index instead of reading the per-block converter mark. +func TestParquetBucketStores_Series_MultiShard_BucketIndex(t *testing.T) { + const ( + userID = "user-1" + metricName = "test_metric" + numSeries = 6 // 6 unique series + // numRowGroups=1, maxRowsPerRowGroup=2 → ceil(6/1*2) = 3 shards + numRowGroups = 1 + maxRowsPerRowGroup = 2 + expectedShards = 3 + ) + + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore) + cfg.BucketStore.BucketIndex.Enabled = true + + storageDir := t.TempDir() + + // Create a block with 6 unique series. + generateStorageBlockWithMultipleSeries(t, storageDir, userID, metricName, numSeries, 0, 100, 15) + + bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + overrides := validation.NewOverrides(validation.Limits{}, nil) + uBucket := bucket.NewUserBucketClient(userID, bkt, overrides) + + // Convert to parquet with 3 shards and write the (correct) converter mark. + userPath := filepath.Join(storageDir, userID) + blockIDs, err := convertToParquetBlocksWithShardsForTesting(userPath, uBucket, numRowGroups, maxRowsPerRowGroup) + require.NoError(t, err) + require.Len(t, blockIDs, 1) + + uidV2, err := ulidv2.Parse(blockIDs[0]) + require.NoError(t, err) + + // The bucket index discovers parquet blocks via the global markers location + // (parquet-markers/-parquet-converter-mark.json), so upload it there too. + require.NoError(t, uBucket.Upload(context.Background(), bucketindex.ConverterMarkFilePath(uidV2), bytes.NewReader([]byte("{}")))) + + // Build the bucket index (with parquet info) so the shard count is recorded there. + idx, _, _, err := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()).EnableParquet().UpdateIndex(context.Background(), nil) + require.NoError(t, err) + require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) + + // The bucket index must record the actual number of shards (3). + require.Len(t, idx.Blocks, 1) + require.NotNil(t, idx.Blocks[0].Parquet) + require.Equal(t, expectedShards, idx.Blocks[0].Parquet.Shards, "bucket index should record 3 shards") + + // Overwrite the converter mark with a wrong shard count (1). If the Store Gateway + // used the converter mark instead of the bucket index, it would only read 1 shard. + require.NoError(t, cortex_parquet.WriteConverterMark(context.Background(), uidV2, uBucket, 1)) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + series, _, err := querySeries(stores, userID, metricName, 0, 100, blockIDs...) + require.NoError(t, err) + assert.Equal(t, numSeries, len(series), "all series must be returned using the bucket index shard count") +}