Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [ENHANCEMENT] Store Gateway: Resolve the parquet shard count from the bucket index instead of reading the converter mark for each block, reducing object storage calls when the bucket index is enabled. #7648
* [ENHANCEMENT] Query Frontend: Improve the slow query log with `source`, `user_agent`, `engine_type`, `block_store_type`, and query stats fields to aid slow query diagnosis. #7601
* [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626
* [ENHANCEMENT] Metrics: Add native histogram support to all remaining production histograms, enabling dual-format (classic + native) exposition across all Cortex components. #7636
Expand Down
67 changes: 52 additions & 15 deletions pkg/storegateway/parquet_bucket_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ import (
"google.golang.org/grpc/status"

cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/parquetutil"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

type parquetBucketStore struct {
logger log.Logger
bucket objstore.InstrumentedBucket
limits *validation.Overrides
concurrency int
logger log.Logger
bucket objstore.InstrumentedBucket
indexBucket objstore.Bucket
limits *validation.Overrides
userID string
bucketIndexEnabled bool
concurrency int

chunksDecoder *schema.PrometheusParquetChunksDecoder

Expand Down Expand Up @@ -67,20 +71,15 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket)
noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx))

// Read converter marks and expand to per-shard (blockID, shardID) lists.
// TODO(Sungjin1212): Read the shard count from the bucket index instead of reading the converter mark for each block.
shardCounts, err := p.resolveShardCounts(ctx, blockIDs)
if err != nil {
return nil, err
}

var shardBlockIDs []string
var shardIDs []int
for _, blockID := range blockIDs {
uid, err := ulid.Parse(blockID)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID)
}
marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger)
if err != nil {
return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
}
numShards := marker.Shards
numShards := shardCounts[blockID]
if numShards <= 0 {
// backward compatibility: blocks without a shard count have one shard
numShards = 1
Expand Down Expand Up @@ -112,6 +111,44 @@ func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatcher
return parquetBlocks, nil
}

// resolveShardCounts returns the number of parquet shards for each requested block ID.
//
// When the bucket index is enabled, the shard count is read from the bucket index.
// When the bucket index is disabled, it falls back to reading the converter mark
// for each block.
func (p *parquetBucketStore) resolveShardCounts(ctx context.Context, blockIDs []string) (map[string]int, error) {
shardCounts := make(map[string]int, len(blockIDs))

if p.bucketIndexEnabled {
idx, err := bucketindex.ReadIndex(ctx, p.indexBucket, p.userID, p.limits, p.logger)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be cached with some TTL instead? Or we would rather have a separate goroutine to sync bucket index periodically rather than resolving it at query time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use bucketindex.Loader it has built-in caching

if err != nil {
return nil, errors.Wrap(err, "failed to read bucket index")
}
for _, b := range idx.Blocks {
numShards := 1
if b.Parquet != nil && b.Parquet.Shards > 0 {
numShards = b.Parquet.Shards
}
shardCounts[b.ID.String()] = numShards
}
return shardCounts, nil
}

// Fallback: read the converter mark for each block.
for _, blockID := range blockIDs {
uid, err := ulid.Parse(blockID)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse block ID %s", blockID)
}
marker, err := cortex_parquet.ReadConverterMark(ctx, uid, p.bucket, p.logger)
if err != nil {
return nil, errors.Wrapf(err, "failed to read converter mark for block %s", blockID)
}
shardCounts[blockID] = marker.Shards
}
return shardCounts, nil
}

// Series implements the store interface for a single parquet bucket store
func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
spanLog, ctx := spanlogger.New(seriesSrv.Context(), "ParquetBucketStore.Series")
Expand Down
19 changes: 11 additions & 8 deletions pkg/storegateway/parquet_bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,17 @@ func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger
userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits)

store := &parquetBucketStore{
logger: userLogger,
bucket: userBucket,
limits: u.limits,
concurrency: u.cfg.BucketStore.ParquetQueryConcurrency,
chunksDecoder: u.chunksDecoder,
matcherCache: u.matcherCache,
parquetShardCache: u.parquetShardCache,
rowRangesCache: u.rowRangesCache,
logger: userLogger,
bucket: userBucket,
indexBucket: u.bucket,
limits: u.limits,
userID: userID,
bucketIndexEnabled: u.cfg.BucketStore.BucketIndex.Enabled,
concurrency: u.cfg.BucketStore.ParquetQueryConcurrency,
chunksDecoder: u.chunksDecoder,
matcherCache: u.matcherCache,
parquetShardCache: u.parquetShardCache,
rowRangesCache: u.rowRangesCache,
}

return store, nil
Expand Down
66 changes: 66 additions & 0 deletions pkg/storegateway/parquet_bucket_stores_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storegateway

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -30,6 +31,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/util/users"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -485,3 +487,67 @@ func TestParquetBucketStores_Series_MultiShard(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, numSeries, len(series), "all series from all shards must be returned")
}

// TestParquetBucketStores_Series_MultiShard_BucketIndex verifies that, when the bucket
// index is enabled, the Store Gateway resolves the parquet shard count from the bucket
// index instead of reading the per-block converter mark.
func TestParquetBucketStores_Series_MultiShard_BucketIndex(t *testing.T) {
const (
userID = "user-1"
metricName = "test_metric"
numSeries = 6 // 6 unique series
// numRowGroups=1, maxRowsPerRowGroup=2 → ceil(6/1*2) = 3 shards
numRowGroups = 1
maxRowsPerRowGroup = 2
expectedShards = 3
)

cfg := prepareStorageConfig(t)
cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore)
cfg.BucketStore.BucketIndex.Enabled = true

storageDir := t.TempDir()

// Create a block with 6 unique series.
generateStorageBlockWithMultipleSeries(t, storageDir, userID, metricName, numSeries, 0, 100, 15)

bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

overrides := validation.NewOverrides(validation.Limits{}, nil)
uBucket := bucket.NewUserBucketClient(userID, bkt, overrides)

// Convert to parquet with 3 shards and write the (correct) converter mark.
userPath := filepath.Join(storageDir, userID)
blockIDs, err := convertToParquetBlocksWithShardsForTesting(userPath, uBucket, numRowGroups, maxRowsPerRowGroup)
require.NoError(t, err)
require.Len(t, blockIDs, 1)

uidV2, err := ulidv2.Parse(blockIDs[0])
require.NoError(t, err)

// The bucket index discovers parquet blocks via the global markers location
// (parquet-markers/<blockID>-parquet-converter-mark.json), so upload it there too.
require.NoError(t, uBucket.Upload(context.Background(), bucketindex.ConverterMarkFilePath(uidV2), bytes.NewReader([]byte("{}"))))

// Build the bucket index (with parquet info) so the shard count is recorded there.
idx, _, _, err := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()).EnableParquet().UpdateIndex(context.Background(), nil)
require.NoError(t, err)
require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx))

// The bucket index must record the actual number of shards (3).
require.Len(t, idx.Blocks, 1)
require.NotNil(t, idx.Blocks[0].Parquet)
require.Equal(t, expectedShards, idx.Blocks[0].Parquet.Shards, "bucket index should record 3 shards")

// Overwrite the converter mark with a wrong shard count (1). If the Store Gateway
// used the converter mark instead of the bucket index, it would only read 1 shard.
require.NoError(t, cortex_parquet.WriteConverterMark(context.Background(), uidV2, uBucket, 1))

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), prometheus.NewRegistry())
require.NoError(t, err)

series, _, err := querySeries(stores, userID, metricName, 0, 100, blockIDs...)
require.NoError(t, err)
assert.Equal(t, numSeries, len(series), "all series must be returned using the bucket index shard count")
}
Loading