Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
3576c51
Add redis-proxy: dual-write Redis proxy with pub/sub forwarding
bootjp Mar 17, 2026
06736e9
Potential fix for pull request finding
bootjp Mar 17, 2026
2a3bd9f
Merge branch 'feature/redis' into feature/redis-proxy
bootjp Mar 17, 2026
f35c41d
Address PR #351 review feedback
bootjp Mar 17, 2026
4bc099d
Fix binary data handling, type safety, and Sentry flood protection
bootjp Mar 17, 2026
411cf06
Support normal command mode after pub/sub unsubscribe
bootjp Mar 17, 2026
7bc0300
Replace interface{} with any in codebase
bootjp Mar 17, 2026
fca3334
Merge branch 'feature/redis' into feature/redis-proxy
bootjp Mar 17, 2026
74e0f44
Potential fix for pull request finding
bootjp Mar 17, 2026
be85bca
Merge branch 'main' into feature/redis-proxy
bootjp Mar 17, 2026
534a39b
Fix race condition, error normalization, and security concerns
bootjp Mar 17, 2026
c005cf0
Add DB/password config for backends and ignore SELECT/AUTH commands
bootjp Mar 17, 2026
4308912
Enhance pubsub with idempotent subscription handling
bootjp Mar 17, 2026
18e1aaa
Refactor error handling in pubsub session
bootjp Mar 17, 2026
1a70ea0
Refactor pubsub test writes with tagged types
bootjp Mar 17, 2026
f065da7
Fix execTxn to prioritize pipeline-level errors over results
bootjp Mar 17, 2026
9346cee
Address review: writeMu separation, graceful shutdown, log truncation…
bootjp Mar 17, 2026
721087b
Truncate migration-gap log key and optimize truncateValue for large v…
bootjp Mar 17, 2026
1f9334f
Fix truncateValue []byte marker, bounded cleanup wait, mock Close race
bootjp Mar 17, 2026
69a4439
Bound exitPubSubMode fwdDone wait and use WriteInt64 for sub counts
bootjp Mar 17, 2026
11609b0
Potential fix for pull request finding
bootjp Mar 17, 2026
da95a8c
Potential fix for pull request finding
bootjp Mar 17, 2026
50343d9
Fix CmdPubSub comment, align SUBSCRIBE-in-txn test with new queuing b…
bootjp Mar 17, 2026
f1b0350
Bound metrics shutdown, return error on upstream unsub failure, fix m…
bootjp Mar 17, 2026
3d38cd3
Potential fix for pull request finding
bootjp Mar 17, 2026
001fbd3
Initial plan
Copilot Mar 17, 2026
2f2f0ce
proxy: fix exhaustive lint error in truncateValue by using if/else in…
Copilot Mar 17, 2026
1f2f00d
proxy/sentry: fix exhaustive lint error in truncateValue
Copilot Mar 17, 2026
20c798f
Merge pull request #352 from bootjp/copilot/sub-pr-351
bootjp Mar 17, 2026
7c634c1
Close dconn on fwd timeout to unblock writeMu, fix exhaustive lint
bootjp Mar 17, 2026
77ee8a1
Use switch for reflect.Kind to satisfy staticcheck with exhaustive no…
bootjp Mar 17, 2026
f389fc6
Fix Pipeline redis.Error wrapping and separate state from writeMu
bootjp Mar 18, 2026
70b14f4
Add shadow subscribe for pub/sub divergence detection
bootjp Mar 18, 2026
493f80c
Potential fix for pull request finding
bootjp Mar 18, 2026
a998315
Potential fix for pull request finding
bootjp Mar 18, 2026
ec3d20a
Fix shadow Close deadlock, shadow data race, remove unused method
bootjp Mar 18, 2026
ca0bea1
Potential fix for pull request finding
bootjp Mar 18, 2026
d7b675c
Potential fix for pull request finding
bootjp Mar 18, 2026
75a5209
Fix shadow pubsub review issues: msgKey Pattern, lock scope, sweepAll…
bootjp Mar 18, 2026
6b8da88
Potential fix for pull request finding
bootjp Mar 18, 2026
2746640
Potential fix for pull request finding
bootjp Mar 18, 2026
cd9ffed
Potential fix for pull request finding
bootjp Mar 18, 2026
3cce409
Potential fix for pull request finding
bootjp Mar 18, 2026
1568458
Review fixes: correctness, concurrency, performance, and test coverage
bootjp Mar 19, 2026
3785deb
Fix ExtraOnSecondary test for buffered matchSecondary
bootjp Mar 19, 2026
4d23559
Address review feedback: command table, shadow pubsub, mu comment
bootjp Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions cmd/redis-proxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
"context"
"flag"
"fmt"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/bootjp/elastickv/proxy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
sentryFlushTimeout = 2 * time.Second
metricsShutdownTimeout = 5 * time.Second
)

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}

func run() error {
cfg := proxy.DefaultConfig()
var modeStr string

flag.StringVar(&cfg.ListenAddr, "listen", cfg.ListenAddr, "Proxy listen address")
flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address")
flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number")
flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password")
flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address")
flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number")
flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password")
flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only")
flag.DurationVar(&cfg.SecondaryTimeout, "secondary-timeout", cfg.SecondaryTimeout, "Secondary write timeout")
flag.DurationVar(&cfg.ShadowTimeout, "shadow-timeout", cfg.ShadowTimeout, "Shadow read timeout")
flag.StringVar(&cfg.SentryDSN, "sentry-dsn", cfg.SentryDSN, "Sentry DSN (empty = disabled)")
flag.StringVar(&cfg.SentryEnv, "sentry-env", cfg.SentryEnv, "Sentry environment")
flag.Float64Var(&cfg.SentrySampleRate, "sentry-sample", cfg.SentrySampleRate, "Sentry sample rate")
flag.StringVar(&cfg.MetricsAddr, "metrics", cfg.MetricsAddr, "Prometheus metrics address")
flag.Parse()

mode, ok := proxy.ParseProxyMode(modeStr)
if !ok {
return fmt.Errorf("unknown mode: %s", modeStr)
}
cfg.Mode = mode

logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

// Sentry
sentryReporter := proxy.NewSentryReporter(cfg.SentryDSN, cfg.SentryEnv, cfg.SentrySampleRate, logger)
defer sentryReporter.Flush(sentryFlushTimeout)

// Prometheus
reg := prometheus.NewRegistry()
metrics := proxy.NewProxyMetrics(reg)

// Backends
primaryOpts := proxy.DefaultBackendOptions()
primaryOpts.DB = cfg.PrimaryDB
primaryOpts.Password = cfg.PrimaryPassword
secondaryOpts := proxy.DefaultBackendOptions()
secondaryOpts.DB = cfg.SecondaryDB
secondaryOpts.Password = cfg.SecondaryPassword

var primary, secondary proxy.Backend
switch cfg.Mode {
case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly:
primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow:
primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts)
secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts)
}
defer primary.Close()
defer secondary.Close()

dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger)
defer dual.Close() // wait for in-flight async goroutines
srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger)

// Context for graceful shutdown
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()

// Start metrics server
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp", cfg.MetricsAddr)
if err != nil {
logger.Error("metrics listen failed", "addr", cfg.MetricsAddr, "err", err)
return
}
metricsSrv := &http.Server{Handler: mux, ReadHeaderTimeout: time.Second}
go func() {
<-ctx.Done()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), metricsShutdownTimeout)
defer shutdownCancel()
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
logger.Warn("metrics server shutdown error", "err", err)
}
}()
logger.Info("metrics server starting", "addr", cfg.MetricsAddr)
if err := metricsSrv.Serve(ln); err != nil && err != http.ErrServerClosed {
logger.Error("metrics server error", "err", err)
}
}()

// Start proxy
if err := srv.ListenAndServe(ctx); err != nil {
return fmt.Errorf("proxy server: %w", err)
}
return nil
}
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ require (
github.com/cockroachdb/errors v1.12.0
github.com/cockroachdb/pebble v1.1.5
github.com/emirpasic/gods v1.18.1
github.com/getsentry/sentry-go v0.27.0
github.com/hashicorp/go-hclog v1.6.3
github.com/hashicorp/raft v1.7.3
github.com/hashicorp/raft-boltdb/v2 v2.3.1
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.23.2
github.com/redis/go-redis/v9 v9.18.0
Expand Down Expand Up @@ -50,7 +50,6 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.17 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/boltdb/bolt v1.3.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cockroachdb/datadriven v1.0.3-0.20250407164829-2945557346d5 // indirect
github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect
Expand All @@ -60,7 +59,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/getsentry/sentry-go v0.27.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect
Expand All @@ -85,7 +83,6 @@ require (
github.com/tidwall/btree v1.1.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.etcd.io/bbolt v1.4.3 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
Expand Down Expand Up @@ -181,7 +180,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJ
github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-metrics v0.5.4 h1:8mmPiIJkTPPEbAiV97IxdAGNdRdaWwVap1BU6elejKY=
github.com/hashicorp/go-metrics v0.5.4/go.mod h1:CG5yz4NZ/AI/aQt9Ucm/vdBnbh7fvmv4lxZ350i+QQI=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-msgpack/v2 v2.1.2 h1:4Ee8FTp834e+ewB71RDrQ0VKpyFdrKOjvYtnQ/ltVj0=
Expand All @@ -200,10 +198,6 @@ github.com/hashicorp/raft v1.7.0/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK
github.com/hashicorp/raft v1.7.3 h1:DxpEqZJysHN0wK+fviai5mFcSYsCkNpFUl1xpAW8Rbo=
github.com/hashicorp/raft v1.7.3/go.mod h1:DfvCGFxpAUPE0L4Uc8JLlTPtc3GzSbdH0MTJCLgnmJQ=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 h1:RLKEcCuKcZ+qp2VlaaZsYZfLOmIiuJNpEi48Rl8u9cQ=
github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0=
github.com/hashicorp/raft-boltdb/v2 v2.3.1 h1:ackhdCNPKblmOhjEU9+4lHSJYFkJd6Jqyvj6eW9pwkc=
github.com/hashicorp/raft-boltdb/v2 v2.3.1/go.mod h1:n4S+g43dXF1tqDT+yzcXHhXM6y7MrlUd3TTwGRcUvQE=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
Expand Down Expand Up @@ -342,8 +336,6 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo=
go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48=
Expand Down
122 changes: 122 additions & 0 deletions proxy/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package proxy

import (
"context"
"errors"
"fmt"
"time"

"github.com/redis/go-redis/v9"
)

const (
defaultPoolSize = 128
defaultDialTimeout = 5 * time.Second
defaultReadTimeout = 3 * time.Second
defaultWriteTimeout = 3 * time.Second
)

// Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV).
type Backend interface {
// Do sends a single command and returns its result.
Do(ctx context.Context, args ...any) *redis.Cmd
// Pipeline sends multiple commands in a pipeline.
Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error)
// Close releases the underlying connection.
Close() error
// Name identifies this backend for logging and metrics.
Name() string
}

// BackendOptions configures the underlying go-redis connection pool.
type BackendOptions struct {
DB int
Password string
PoolSize int
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
}

// DefaultBackendOptions returns reasonable defaults for a proxy backend.
func DefaultBackendOptions() BackendOptions {
return BackendOptions{
PoolSize: defaultPoolSize,
DialTimeout: defaultDialTimeout,
ReadTimeout: defaultReadTimeout,
WriteTimeout: defaultWriteTimeout,
}
}

// PubSubBackend is an optional interface for backends that support
// creating dedicated PubSub connections.
type PubSubBackend interface {
NewPubSub(ctx context.Context) *redis.PubSub
}

// RedisBackend connects to an upstream Redis instance via go-redis.
type RedisBackend struct {
client *redis.Client
name string
}

// NewRedisBackend creates a Backend targeting a Redis server with default pool options.
func NewRedisBackend(addr string, name string) *RedisBackend {
return NewRedisBackendWithOptions(addr, name, DefaultBackendOptions())
}

// NewRedisBackendWithOptions creates a Backend with explicit pool configuration.
func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend {
return &RedisBackend{
client: redis.NewClient(&redis.Options{
Addr: addr,
DB: opts.DB,
Password: opts.Password,
PoolSize: opts.PoolSize,
DialTimeout: opts.DialTimeout,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
}),
name: name,
}
}

func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd {
return b.client.Do(ctx, args...)
}

func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) {
pipe := b.client.Pipeline()
results := make([]*redis.Cmd, len(cmds))
for i, args := range cmds {
results[i] = pipe.Do(ctx, args...)
}
_, err := pipe.Exec(ctx)
if err != nil {
// go-redis pipelines return redis.Error for Redis reply errors (e.g., EXECABORT).
// Return results with nil error so callers can read per-command results (especially EXEC).
// Only propagate true transport/context errors.
var redisErr redis.Error
if errors.As(err, &redisErr) || errors.Is(err, redis.Nil) {
return results, nil
}
return results, fmt.Errorf("pipeline exec: %w", err)
}
return results, nil
}

func (b *RedisBackend) Close() error {
if err := b.client.Close(); err != nil {
return fmt.Errorf("close %s backend: %w", b.name, err)
}
return nil
}

func (b *RedisBackend) Name() string {
return b.name
}

// NewPubSub creates a dedicated PubSub connection (not from the pool).
func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub {
return b.client.Subscribe(ctx)
}
Loading
Loading