-
Notifications
You must be signed in to change notification settings - Fork 2
Add redis-proxy: dual-write Redis proxy with pub/sub forwarding #351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bootjp
wants to merge
46
commits into
main
Choose a base branch
from
feature/redis-proxy
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+4,994
−12
Open
Changes from all commits
Commits
Show all changes
46 commits
Select commit
Hold shift + click to select a range
3576c51
Add redis-proxy: dual-write Redis proxy with pub/sub forwarding
bootjp 06736e9
Potential fix for pull request finding
bootjp 2a3bd9f
Merge branch 'feature/redis' into feature/redis-proxy
bootjp f35c41d
Address PR #351 review feedback
bootjp 4bc099d
Fix binary data handling, type safety, and Sentry flood protection
bootjp 411cf06
Support normal command mode after pub/sub unsubscribe
bootjp 7bc0300
Replace interface{} with any in codebase
bootjp fca3334
Merge branch 'feature/redis' into feature/redis-proxy
bootjp 74e0f44
Potential fix for pull request finding
bootjp be85bca
Merge branch 'main' into feature/redis-proxy
bootjp 534a39b
Fix race condition, error normalization, and security concerns
bootjp c005cf0
Add DB/password config for backends and ignore SELECT/AUTH commands
bootjp 4308912
Enhance pubsub with idempotent subscription handling
bootjp 18e1aaa
Refactor error handling in pubsub session
bootjp 1a70ea0
Refactor pubsub test writes with tagged types
bootjp f065da7
Fix execTxn to prioritize pipeline-level errors over results
bootjp 9346cee
Address review: writeMu separation, graceful shutdown, log truncation…
bootjp 721087b
Truncate migration-gap log key and optimize truncateValue for large v…
bootjp 1f9334f
Fix truncateValue []byte marker, bounded cleanup wait, mock Close race
bootjp 69a4439
Bound exitPubSubMode fwdDone wait and use WriteInt64 for sub counts
bootjp 11609b0
Potential fix for pull request finding
bootjp da95a8c
Potential fix for pull request finding
bootjp 50343d9
Fix CmdPubSub comment, align SUBSCRIBE-in-txn test with new queuing b…
bootjp f1b0350
Bound metrics shutdown, return error on upstream unsub failure, fix m…
bootjp 3d38cd3
Potential fix for pull request finding
bootjp 001fbd3
Initial plan
Copilot 2f2f0ce
proxy: fix exhaustive lint error in truncateValue by using if/else in…
Copilot 1f2f00d
proxy/sentry: fix exhaustive lint error in truncateValue
Copilot 20c798f
Merge pull request #352 from bootjp/copilot/sub-pr-351
bootjp 7c634c1
Close dconn on fwd timeout to unblock writeMu, fix exhaustive lint
bootjp 77ee8a1
Use switch for reflect.Kind to satisfy staticcheck with exhaustive no…
bootjp f389fc6
Fix Pipeline redis.Error wrapping and separate state from writeMu
bootjp 70b14f4
Add shadow subscribe for pub/sub divergence detection
bootjp 493f80c
Potential fix for pull request finding
bootjp a998315
Potential fix for pull request finding
bootjp ec3d20a
Fix shadow Close deadlock, shadow data race, remove unused method
bootjp ca0bea1
Potential fix for pull request finding
bootjp d7b675c
Potential fix for pull request finding
bootjp 75a5209
Fix shadow pubsub review issues: msgKey Pattern, lock scope, sweepAll…
bootjp 6b8da88
Potential fix for pull request finding
bootjp 2746640
Potential fix for pull request finding
bootjp cd9ffed
Potential fix for pull request finding
bootjp 3cce409
Potential fix for pull request finding
bootjp 1568458
Review fixes: correctness, concurrency, performance, and test coverage
bootjp 3785deb
Fix ExtraOnSecondary test for buffered matchSecondary
bootjp 4d23559
Address review feedback: command table, shadow pubsub, mu comment
bootjp File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "flag" | ||
| "fmt" | ||
| "log/slog" | ||
| "net" | ||
| "net/http" | ||
| "os" | ||
| "os/signal" | ||
| "syscall" | ||
| "time" | ||
|
|
||
| "github.com/bootjp/elastickv/proxy" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
| ) | ||
|
|
||
| const ( | ||
| sentryFlushTimeout = 2 * time.Second | ||
| metricsShutdownTimeout = 5 * time.Second | ||
| ) | ||
|
|
||
| func main() { | ||
| if err := run(); err != nil { | ||
| fmt.Fprintf(os.Stderr, "error: %v\n", err) | ||
| os.Exit(1) | ||
| } | ||
| } | ||
|
|
||
| func run() error { | ||
| cfg := proxy.DefaultConfig() | ||
| var modeStr string | ||
|
|
||
| flag.StringVar(&cfg.ListenAddr, "listen", cfg.ListenAddr, "Proxy listen address") | ||
| flag.StringVar(&cfg.PrimaryAddr, "primary", cfg.PrimaryAddr, "Primary (Redis) address") | ||
| flag.IntVar(&cfg.PrimaryDB, "primary-db", cfg.PrimaryDB, "Primary Redis DB number") | ||
| flag.StringVar(&cfg.PrimaryPassword, "primary-password", cfg.PrimaryPassword, "Primary Redis password") | ||
| flag.StringVar(&cfg.SecondaryAddr, "secondary", cfg.SecondaryAddr, "Secondary (ElasticKV) address") | ||
| flag.IntVar(&cfg.SecondaryDB, "secondary-db", cfg.SecondaryDB, "Secondary Redis DB number") | ||
| flag.StringVar(&cfg.SecondaryPassword, "secondary-password", cfg.SecondaryPassword, "Secondary Redis password") | ||
| flag.StringVar(&modeStr, "mode", "dual-write", "Proxy mode: redis-only, dual-write, dual-write-shadow, elastickv-primary, elastickv-only") | ||
| flag.DurationVar(&cfg.SecondaryTimeout, "secondary-timeout", cfg.SecondaryTimeout, "Secondary write timeout") | ||
| flag.DurationVar(&cfg.ShadowTimeout, "shadow-timeout", cfg.ShadowTimeout, "Shadow read timeout") | ||
| flag.StringVar(&cfg.SentryDSN, "sentry-dsn", cfg.SentryDSN, "Sentry DSN (empty = disabled)") | ||
| flag.StringVar(&cfg.SentryEnv, "sentry-env", cfg.SentryEnv, "Sentry environment") | ||
| flag.Float64Var(&cfg.SentrySampleRate, "sentry-sample", cfg.SentrySampleRate, "Sentry sample rate") | ||
| flag.StringVar(&cfg.MetricsAddr, "metrics", cfg.MetricsAddr, "Prometheus metrics address") | ||
| flag.Parse() | ||
|
|
||
| mode, ok := proxy.ParseProxyMode(modeStr) | ||
| if !ok { | ||
| return fmt.Errorf("unknown mode: %s", modeStr) | ||
| } | ||
| cfg.Mode = mode | ||
|
|
||
| logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) | ||
|
|
||
| // Sentry | ||
| sentryReporter := proxy.NewSentryReporter(cfg.SentryDSN, cfg.SentryEnv, cfg.SentrySampleRate, logger) | ||
| defer sentryReporter.Flush(sentryFlushTimeout) | ||
|
|
||
| // Prometheus | ||
| reg := prometheus.NewRegistry() | ||
| metrics := proxy.NewProxyMetrics(reg) | ||
|
|
||
| // Backends | ||
| primaryOpts := proxy.DefaultBackendOptions() | ||
| primaryOpts.DB = cfg.PrimaryDB | ||
| primaryOpts.Password = cfg.PrimaryPassword | ||
| secondaryOpts := proxy.DefaultBackendOptions() | ||
| secondaryOpts.DB = cfg.SecondaryDB | ||
| secondaryOpts.Password = cfg.SecondaryPassword | ||
|
|
||
| var primary, secondary proxy.Backend | ||
| switch cfg.Mode { | ||
| case proxy.ModeElasticKVPrimary, proxy.ModeElasticKVOnly: | ||
| primary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts) | ||
| secondary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts) | ||
| case proxy.ModeRedisOnly, proxy.ModeDualWrite, proxy.ModeDualWriteShadow: | ||
| primary = proxy.NewRedisBackendWithOptions(cfg.PrimaryAddr, "redis", primaryOpts) | ||
| secondary = proxy.NewRedisBackendWithOptions(cfg.SecondaryAddr, "elastickv", secondaryOpts) | ||
| } | ||
| defer primary.Close() | ||
| defer secondary.Close() | ||
|
|
||
| dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger) | ||
| defer dual.Close() // wait for in-flight async goroutines | ||
| srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger) | ||
|
|
||
| // Context for graceful shutdown | ||
| ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) | ||
| defer cancel() | ||
|
|
||
| // Start metrics server | ||
| go func() { | ||
| mux := http.NewServeMux() | ||
| mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) | ||
| var lc net.ListenConfig | ||
| ln, err := lc.Listen(ctx, "tcp", cfg.MetricsAddr) | ||
| if err != nil { | ||
| logger.Error("metrics listen failed", "addr", cfg.MetricsAddr, "err", err) | ||
| return | ||
| } | ||
| metricsSrv := &http.Server{Handler: mux, ReadHeaderTimeout: time.Second} | ||
| go func() { | ||
| <-ctx.Done() | ||
| shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), metricsShutdownTimeout) | ||
| defer shutdownCancel() | ||
| if err := metricsSrv.Shutdown(shutdownCtx); err != nil { | ||
| logger.Warn("metrics server shutdown error", "err", err) | ||
| } | ||
| }() | ||
| logger.Info("metrics server starting", "addr", cfg.MetricsAddr) | ||
| if err := metricsSrv.Serve(ln); err != nil && err != http.ErrServerClosed { | ||
| logger.Error("metrics server error", "err", err) | ||
| } | ||
| }() | ||
|
|
||
| // Start proxy | ||
| if err := srv.ListenAndServe(ctx); err != nil { | ||
| return fmt.Errorf("proxy server: %w", err) | ||
| } | ||
| return nil | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| package proxy | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "github.com/redis/go-redis/v9" | ||
| ) | ||
|
|
||
| const ( | ||
| defaultPoolSize = 128 | ||
| defaultDialTimeout = 5 * time.Second | ||
| defaultReadTimeout = 3 * time.Second | ||
| defaultWriteTimeout = 3 * time.Second | ||
| ) | ||
|
|
||
| // Backend abstracts a Redis-protocol endpoint (real Redis or ElasticKV). | ||
| type Backend interface { | ||
| // Do sends a single command and returns its result. | ||
| Do(ctx context.Context, args ...any) *redis.Cmd | ||
| // Pipeline sends multiple commands in a pipeline. | ||
| Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) | ||
| // Close releases the underlying connection. | ||
| Close() error | ||
| // Name identifies this backend for logging and metrics. | ||
| Name() string | ||
| } | ||
|
|
||
| // BackendOptions configures the underlying go-redis connection pool. | ||
| type BackendOptions struct { | ||
| DB int | ||
| Password string | ||
| PoolSize int | ||
| DialTimeout time.Duration | ||
| ReadTimeout time.Duration | ||
| WriteTimeout time.Duration | ||
| } | ||
|
|
||
| // DefaultBackendOptions returns reasonable defaults for a proxy backend. | ||
| func DefaultBackendOptions() BackendOptions { | ||
| return BackendOptions{ | ||
| PoolSize: defaultPoolSize, | ||
| DialTimeout: defaultDialTimeout, | ||
| ReadTimeout: defaultReadTimeout, | ||
| WriteTimeout: defaultWriteTimeout, | ||
| } | ||
| } | ||
|
|
||
| // PubSubBackend is an optional interface for backends that support | ||
| // creating dedicated PubSub connections. | ||
| type PubSubBackend interface { | ||
| NewPubSub(ctx context.Context) *redis.PubSub | ||
| } | ||
|
|
||
| // RedisBackend connects to an upstream Redis instance via go-redis. | ||
| type RedisBackend struct { | ||
| client *redis.Client | ||
| name string | ||
| } | ||
|
|
||
| // NewRedisBackend creates a Backend targeting a Redis server with default pool options. | ||
| func NewRedisBackend(addr string, name string) *RedisBackend { | ||
| return NewRedisBackendWithOptions(addr, name, DefaultBackendOptions()) | ||
| } | ||
|
|
||
| // NewRedisBackendWithOptions creates a Backend with explicit pool configuration. | ||
| func NewRedisBackendWithOptions(addr string, name string, opts BackendOptions) *RedisBackend { | ||
| return &RedisBackend{ | ||
| client: redis.NewClient(&redis.Options{ | ||
| Addr: addr, | ||
| DB: opts.DB, | ||
| Password: opts.Password, | ||
| PoolSize: opts.PoolSize, | ||
| DialTimeout: opts.DialTimeout, | ||
| ReadTimeout: opts.ReadTimeout, | ||
| WriteTimeout: opts.WriteTimeout, | ||
| }), | ||
| name: name, | ||
| } | ||
| } | ||
|
|
||
| func (b *RedisBackend) Do(ctx context.Context, args ...any) *redis.Cmd { | ||
| return b.client.Do(ctx, args...) | ||
| } | ||
|
|
||
| func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd, error) { | ||
| pipe := b.client.Pipeline() | ||
| results := make([]*redis.Cmd, len(cmds)) | ||
| for i, args := range cmds { | ||
| results[i] = pipe.Do(ctx, args...) | ||
| } | ||
| _, err := pipe.Exec(ctx) | ||
| if err != nil { | ||
| // go-redis pipelines return redis.Error for Redis reply errors (e.g., EXECABORT). | ||
| // Return results with nil error so callers can read per-command results (especially EXEC). | ||
| // Only propagate true transport/context errors. | ||
| var redisErr redis.Error | ||
| if errors.As(err, &redisErr) || errors.Is(err, redis.Nil) { | ||
| return results, nil | ||
| } | ||
| return results, fmt.Errorf("pipeline exec: %w", err) | ||
| } | ||
| return results, nil | ||
| } | ||
|
|
||
| func (b *RedisBackend) Close() error { | ||
| if err := b.client.Close(); err != nil { | ||
| return fmt.Errorf("close %s backend: %w", b.name, err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (b *RedisBackend) Name() string { | ||
| return b.name | ||
| } | ||
|
|
||
| // NewPubSub creates a dedicated PubSub connection (not from the pool). | ||
| func (b *RedisBackend) NewPubSub(ctx context.Context) *redis.PubSub { | ||
| return b.client.Subscribe(ctx) | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.