diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index 5a6d3baec..da5231060 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -11,6 +11,7 @@ Current package versions: - Add experimental Redis 8.8 array support, including array APIs on `IDatabase`/`IDatabaseAsync`, array helper types, `RedisType.Array`, and array delete keyspace notification event types. ([#3076 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3076)) - Enable TCP keep-alives ([#3078 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3078)) +- Fix incorrect routing of some sorted-set and stream commands ([#3080 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3080)) - `ConfigurationOptions` : don't persist `Protocol` when it comes from the defaults-provider. ([#3082 by @mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/3082)) ## 2.13.1 diff --git a/src/StackExchange.Redis/Enums/SetOperation.cs b/src/StackExchange.Redis/Enums/SetOperation.cs index a529d348e..9e3449fa7 100644 --- a/src/StackExchange.Redis/Enums/SetOperation.cs +++ b/src/StackExchange.Redis/Enums/SetOperation.cs @@ -25,15 +25,39 @@ public enum SetOperation internal static class SetOperationExtensions { - internal static RedisCommand ToCommand(this SetOperation operation, bool store) => operation switch + internal static RedisCommand ToSetCommand(this SetOperation operation) => operation switch + { + SetOperation.Union => RedisCommand.SUNION, + SetOperation.Intersect => RedisCommand.SINTER, + SetOperation.Difference => RedisCommand.SDIFF, + _ => OutOfRange(operation), + }; + + internal static RedisCommand ToSetStoreCommand(this SetOperation operation) => operation switch + { + SetOperation.Union => RedisCommand.SUNIONSTORE, + SetOperation.Intersect => RedisCommand.SINTERSTORE, + SetOperation.Difference => RedisCommand.SDIFFSTORE, + _ => OutOfRange(operation), + }; + + internal static RedisCommand ToSortedSetCommand(this SetOperation operation) => operation switch { - SetOperation.Intersect when store => RedisCommand.ZINTERSTORE, - SetOperation.Intersect => RedisCommand.ZINTER, - SetOperation.Union when store => RedisCommand.ZUNIONSTORE, SetOperation.Union => RedisCommand.ZUNION, - SetOperation.Difference when store => RedisCommand.ZDIFFSTORE, + SetOperation.Intersect => RedisCommand.ZINTER, SetOperation.Difference => RedisCommand.ZDIFF, - _ => throw new ArgumentOutOfRangeException(nameof(operation)), + _ => OutOfRange(operation), + }; + + internal static RedisCommand ToSortedSetStoreCommand(this SetOperation operation) => operation switch + { + SetOperation.Union => RedisCommand.ZUNIONSTORE, + SetOperation.Intersect => RedisCommand.ZINTERSTORE, + SetOperation.Difference => RedisCommand.ZDIFFSTORE, + _ => OutOfRange(operation), }; + + private static RedisCommand OutOfRange(SetOperation operation) => + throw new ArgumentOutOfRangeException(nameof(operation), operation, null); } } diff --git a/src/StackExchange.Redis/ExtensionMethods.Internal.cs b/src/StackExchange.Redis/ExtensionMethods.Internal.cs index 446f6ff88..34a83b33b 100644 --- a/src/StackExchange.Redis/ExtensionMethods.Internal.cs +++ b/src/StackExchange.Redis/ExtensionMethods.Internal.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; namespace StackExchange.Redis @@ -11,6 +12,26 @@ internal static bool IsNullOrEmpty([NotNullWhen(false)] this string? s) => internal static bool IsNullOrWhiteSpace([NotNullWhen(false)] this string? s) => string.IsNullOrWhiteSpace(s); + internal static RedisKey[] AssertAllNonNull(this RedisKey[] keys) + { + if (keys is null) throw new ArgumentNullException(nameof(keys)); + for (var i = 0; i < keys.Length; i++) + { + keys[i].AssertNotNull(); + } + return keys; + } + + internal static RedisValue[] AssertAllNonNull(this RedisValue[] values) + { + if (values is null) throw new ArgumentNullException(nameof(values)); + for (var i = 0; i < values.Length; i++) + { + values[i].AssertNotNull(); + } + return values; + } + #if !NET internal static bool TryDequeue(this Queue queue, [NotNullWhen(true)] out T? result) { diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs index 05dfc07c3..c012d8478 100644 --- a/src/StackExchange.Redis/Message.cs +++ b/src/StackExchange.Redis/Message.cs @@ -393,6 +393,11 @@ public static Message Create( public static Message CreateInSlot(int db, int slot, CommandFlags flags, RedisCommand command, RedisValue[] values) => new CommandSlotValuesMessage(db, slot, flags, command, values); + // The key here is used only to route the message in cluster mode; it is not written as an argument. + // Use this for command shapes where the key appears in a non-standard position in the values payload. + public static Message CreateInKeySlot(int db, in RedisKey key, CommandFlags flags, RedisCommand command, RedisValue[] values) => + new CommandKeySlotValuesMessage(db, flags, command, key, values); + public static Message Create(int db, CommandFlags flags, RedisCommand command, KeyValuePair[] values, Expiration expiry, When when) => new MultiSetMessage(db, flags, command, values, expiry, when); @@ -984,21 +989,13 @@ private sealed class CommandKeyKeysMessage : CommandKeyBase private readonly RedisKey[] keys; public CommandKeyKeysMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisKey[] keys) : base(db, flags, command, key) { - for (int i = 0; i < keys.Length; i++) - { - keys[i].AssertNotNull(); - } - this.keys = keys; + this.keys = keys.AssertAllNonNull(); } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { var slot = serverSelectionStrategy.HashSlot(Key); - for (int i = 0; i < keys.Length; i++) - { - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); - } - return slot; + return serverSelectionStrategy.CombineSlot(slot, keys); } protected override void WriteImpl(PhysicalConnection physical) @@ -1050,11 +1047,7 @@ private sealed class CommandValuesMessage : Message private readonly RedisValue[] values; public CommandValuesMessage(int db, CommandFlags flags, RedisCommand command, RedisValue[] values) : base(db, flags, command) { - for (int i = 0; i < values.Length; i++) - { - values[i].AssertNotNull(); - } - this.values = values; + this.values = values.AssertAllNonNull(); } protected override void WriteImpl(PhysicalConnection physical) @@ -1073,22 +1066,10 @@ private sealed class CommandKeysMessage : Message private readonly RedisKey[] keys; public CommandKeysMessage(int db, CommandFlags flags, RedisCommand command, RedisKey[] keys) : base(db, flags, command) { - for (int i = 0; i < keys.Length; i++) - { - keys[i].AssertNotNull(); - } - this.keys = keys; + this.keys = keys.AssertAllNonNull(); } - public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) - { - int slot = ServerSelectionStrategy.NoSlot; - for (int i = 0; i < keys.Length; i++) - { - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); - } - return slot; - } + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(keys); protected override void WriteImpl(PhysicalConnection physical) { @@ -1125,11 +1106,7 @@ private sealed class CommandKeyValuesKeyMessage : CommandKeyBase private readonly RedisValue[] values; public CommandKeyValuesKeyMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key0, RedisValue[] values, in RedisKey key1) : base(db, flags, command, key0) { - for (int i = 0; i < values.Length; i++) - { - values[i].AssertNotNull(); - } - this.values = values; + this.values = values.AssertAllNonNull(); key1.AssertNotNull(); this.key1 = key1; } @@ -1155,11 +1132,7 @@ private sealed class CommandKeyValuesMessage : CommandKeyBase private readonly RedisValue[] values; public CommandKeyValuesMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisValue[] values) : base(db, flags, command, key) { - for (int i = 0; i < values.Length; i++) - { - values[i].AssertNotNull(); - } - this.values = values; + this.values = values.AssertAllNonNull(); } protected override void WriteImpl(PhysicalConnection physical) @@ -1177,14 +1150,9 @@ private sealed class CommandKeyKeyValuesMessage : CommandKeyBase private readonly RedisValue[] values; public CommandKeyKeyValuesMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, in RedisKey key1, RedisValue[] values) : base(db, flags, command, key) { - for (int i = 0; i < values.Length; i++) - { - values[i].AssertNotNull(); - } - key1.AssertNotNull(); this.key1 = key1; - this.values = values; + this.values = values.AssertAllNonNull(); } protected override void WriteImpl(PhysicalConnection physical) @@ -1204,16 +1172,11 @@ private sealed class CommandKeyValueValueValuesMessage : CommandKeyBase private readonly RedisValue[] values; public CommandKeyValueValueValuesMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, in RedisValue value0, in RedisValue value1, RedisValue[] values) : base(db, flags, command, key) { - for (int i = 0; i < values.Length; i++) - { - values[i].AssertNotNull(); - } - value0.AssertNotNull(); value1.AssertNotNull(); this.value0 = value0; this.value1 = value1; - this.values = values; + this.values = values.AssertAllNonNull(); } protected override void WriteImpl(PhysicalConnection physical) @@ -1683,14 +1646,32 @@ public CommandSlotValuesMessage(int db, int slot, CommandFlags flags, RedisComma : base(db, flags, command) { this.slot = slot; + this.values = values.AssertAllNonNull(); + } + + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => slot; + + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(command, values.Length); for (int i = 0; i < values.Length; i++) { - values[i].AssertNotNull(); + physical.WriteBulkString(values[i]); } - this.values = values; } + public override int ArgCount => values.Length; + } - public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => slot; + private sealed class CommandKeySlotValuesMessage : CommandKeyBase + { + private readonly RedisValue[] values; + + public CommandKeySlotValuesMessage(int db, CommandFlags flags, RedisCommand command, in RedisKey key, RedisValue[] values) + : base(db, flags, command, key) + { + // Key is captured by CommandKeyBase for routing only; values are the complete serialized arguments. + this.values = values.AssertAllNonNull(); + } protected override void WriteImpl(PhysicalConnection physical) { diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index e33fd69b4..75a04a158 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -2056,49 +2056,49 @@ public Task SetAddAsync(RedisKey key, RedisValue[] values, CommandFlags fl public RedisValue[] SetCombine(SetOperation operation, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, false), first, second); + var msg = Message.Create(Database, flags, operation.ToSetCommand(), first, second); return ExecuteSync(msg, ResultProcessor.RedisValueArray, defaultValue: Array.Empty()); } public RedisValue[] SetCombine(SetOperation operation, RedisKey[] keys, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, false), keys); + var msg = Message.Create(Database, flags, operation.ToSetCommand(), keys); return ExecuteSync(msg, ResultProcessor.RedisValueArray, defaultValue: Array.Empty()); } public long SetCombineAndStore(SetOperation operation, RedisKey destination, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, true), destination, first, second); + var msg = Message.Create(Database, flags, operation.ToSetStoreCommand(), destination, first, second); return ExecuteSync(msg, ResultProcessor.Int64); } public long SetCombineAndStore(SetOperation operation, RedisKey destination, RedisKey[] keys, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, true), destination, keys); + var msg = Message.Create(Database, flags, operation.ToSetStoreCommand(), destination, keys); return ExecuteSync(msg, ResultProcessor.Int64); } public Task SetCombineAndStoreAsync(SetOperation operation, RedisKey destination, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, true), destination, first, second); + var msg = Message.Create(Database, flags, operation.ToSetStoreCommand(), destination, first, second); return ExecuteAsync(msg, ResultProcessor.Int64); } public Task SetCombineAndStoreAsync(SetOperation operation, RedisKey destination, RedisKey[] keys, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, true), destination, keys); + var msg = Message.Create(Database, flags, operation.ToSetStoreCommand(), destination, keys); return ExecuteAsync(msg, ResultProcessor.Int64); } public Task SetCombineAsync(SetOperation operation, RedisKey first, RedisKey second, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, false), first, second); + var msg = Message.Create(Database, flags, operation.ToSetCommand(), first, second); return ExecuteAsync(msg, ResultProcessor.RedisValueArray, defaultValue: Array.Empty()); } public Task SetCombineAsync(SetOperation operation, RedisKey[] keys, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create(Database, flags, SetOperationCommand(operation, false), keys); + var msg = Message.Create(Database, flags, operation.ToSetCommand(), keys); return ExecuteAsync(msg, ResultProcessor.RedisValueArray, defaultValue: Array.Empty()); } @@ -2128,13 +2128,13 @@ public Task SetContainsAsync(RedisKey key, RedisValue[] values, CommandF public long SetIntersectionLength(RedisKey[] keys, long limit = 0, CommandFlags flags = CommandFlags.None) { - var msg = GetSetIntersectionLengthMessage(keys, limit, flags); + var msg = GetSetCardinalityMessage(RedisCommand.SINTERCARD, keys, limit, flags); return ExecuteSync(msg, ResultProcessor.Int64); } public Task SetIntersectionLengthAsync(RedisKey[] keys, long limit = 0, CommandFlags flags = CommandFlags.None) { - var msg = GetSetIntersectionLengthMessage(keys, limit, flags); + var msg = GetSetCardinalityMessage(RedisCommand.SINTERCARD, keys, limit, flags); return ExecuteAsync(msg, ResultProcessor.Int64); } @@ -3064,8 +3064,9 @@ public Task StreamClaimIdsOnlyAsync(RedisKey key, RedisValue consu public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -3081,8 +3082,9 @@ public bool StreamConsumerGroupSetPosition(RedisKey key, RedisValue groupName, R public Task StreamConsumerGroupSetPositionAsync(RedisKey key, RedisValue groupName, RedisValue position, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -3142,8 +3144,9 @@ public Task StreamCreateConsumerGroupAsync(RedisKey key, RedisValue groupN public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XINFO, new RedisValue[] @@ -3158,8 +3161,9 @@ public StreamConsumerInfo[] StreamConsumerInfo(RedisKey key, RedisValue groupNam public Task StreamConsumerInfoAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XINFO, new RedisValue[] @@ -3256,8 +3260,9 @@ public Task StreamDeleteAsync(RedisKey key, RedisValue[] mes public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -3273,8 +3278,9 @@ public long StreamDeleteConsumer(RedisKey key, RedisValue groupName, RedisValue public Task StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, RedisValue consumerName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -3290,8 +3296,9 @@ public Task StreamDeleteConsumerAsync(RedisKey key, RedisValue groupName, public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -3306,8 +3313,9 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman public Task StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) { - var msg = Message.Create( + var msg = Message.CreateInKeySlot( Database, + key, flags, RedisCommand.XGROUP, new RedisValue[] @@ -4390,25 +4398,8 @@ private Message GetRestoreMessage(RedisKey key, byte[] value, TimeSpan? expiry, return Message.Create(Database, flags, RedisCommand.RESTORE, key, pttl, value); } - private Message GetSetIntersectionLengthMessage(RedisKey[] keys, long limit = 0, CommandFlags flags = CommandFlags.None) - { - if (keys == null) throw new ArgumentNullException(nameof(keys)); - - var values = new RedisValue[1 + keys.Length + (limit > 0 ? 2 : 0)]; - int i = 0; - values[i++] = keys.Length; - for (var j = 0; j < keys.Length; j++) - { - values[i++] = keys[j].AsRedisValue(); - } - if (limit > 0) - { - values[i++] = RedisLiterals.LIMIT; - values[i] = limit; - } - - return Message.Create(Database, flags, RedisCommand.SINTERCARD, values); - } + private Message GetSetCardinalityMessage(RedisCommand command, RedisKey[] keys, long limit, CommandFlags flags) + => new SetOperationCardinalityMessage(Database, flags, command, keys, limit); private Message GetSortedSetAddMessage(RedisKey key, RedisValue member, double score, SortedSetWhen when, bool change, CommandFlags flags) => new SingleSortedSetAddMessage(Database, flags, key, member, score, when, change, increment: false); @@ -4525,7 +4516,7 @@ private Message GetSortMessage(RedisKey destination, RedisKey key, long skip, lo private Message GetSortedSetCombineAndStoreCommandMessage(SetOperation operation, RedisKey destination, RedisKey[] keys, double[]? weights, Aggregate aggregate, CommandFlags flags) { - var command = operation.ToCommand(store: true); + var command = operation.ToSortedSetStoreCommand(); if (keys == null) { throw new ArgumentNullException(nameof(keys)); @@ -4552,35 +4543,10 @@ private Message GetSortedSetCombineAndStoreCommandMessage(SetOperation operation private Message GetSortedSetCombineCommandMessage(SetOperation operation, RedisKey[] keys, double[]? weights, Aggregate aggregate, bool withScores, CommandFlags flags) { - var command = operation.ToCommand(store: false); - if (keys == null) - { - throw new ArgumentNullException(nameof(keys)); - } - if (command == RedisCommand.ZDIFF && (weights != null || aggregate != Aggregate.Sum)) - { - throw new ArgumentException("ZDIFF cannot be used with weights or aggregation."); - } - if (weights != null && keys.Length != weights.Length) - { - throw new ArgumentException("Keys and weights should have the same number of elements.", nameof(weights)); - } - - var i = 0; - var values = new RedisValue[1 + keys.Length + - (weights?.Length > 0 ? 1 + weights.Length : 0) + - (aggregate != Aggregate.Sum ? 2 : 0) + - (withScores ? 1 : 0)]; - values[i++] = keys.Length; - foreach (var key in keys) - { - values[i++] = key.AsRedisValue(); - } - AddWeightsAggregationAndScore(values.AsSpan(i), weights, aggregate, withScores: withScores); - return Message.Create(Database, flags, command, values ?? RedisValue.EmptyArray); + return new SetOperationMessage(Database, flags, operation, keys, weights, aggregate, withScores); } - private void AddWeightsAggregationAndScore(Span values, double[]? weights, Aggregate aggregate, bool withScores = false) + private void AddWeightsAggregationAndScore(Span values, double[]? weights, Aggregate aggregate) { int i = 0; if (weights?.Length > 0) @@ -4610,10 +4576,6 @@ private void AddWeightsAggregationAndScore(Span values, double[]? we default: throw new ArgumentOutOfRangeException(nameof(aggregate)); } - if (withScores) - { - values[i++] = RedisLiterals.WITHSCORES; - } } private Message GetSortedSetLengthMessage(RedisKey key, double min, double max, Exclude exclude, CommandFlags flags) @@ -4627,23 +4589,7 @@ private Message GetSortedSetLengthMessage(RedisKey key, double min, double max, } private Message GetSortedSetIntersectionLengthMessage(RedisKey[] keys, long limit, CommandFlags flags) - { - if (keys == null) throw new ArgumentNullException(nameof(keys)); - - var i = 0; - var values = new RedisValue[1 + keys.Length + (limit > 0 ? 2 : 0)]; - values[i++] = keys.Length; - foreach (var key in keys) - { - values[i++] = key.AsRedisValue(); - } - if (limit > 0) - { - values[i++] = RedisLiterals.LIMIT; - values[i++] = limit; - } - return Message.Create(Database, flags, RedisCommand.ZINTERCARD, values); - } + => new SetOperationCardinalityMessage(Database, flags, RedisCommand.ZINTERCARD, keys, limit); private Message GetSortedSetRangeByScoreMessage(RedisKey key, double start, double stop, Exclude exclude, Order order, long skip, long take, CommandFlags flags, bool withScores) { @@ -4909,11 +4855,7 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro values[4] = StreamConstants.MkStream; } - return Message.Create( - Database, - flags, - RedisCommand.XGROUP, - values); + return Message.CreateInKeySlot(Database, key, flags, RedisCommand.XGROUP, values); } /// @@ -5171,8 +5113,8 @@ private Message GetStreamTrimMessage(bool maxLen, RedisKey key, RedisValue thres for (int i = 0; i < keys.Length; i++) { values[i + 2] = keys[i].AsRedisValue(); - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); } + slot = serverSelectionStrategy.CombineSlot(slot, keys); return Message.CreateInSlot(Database, slot, flags, RedisCommand.BITOP, values); } @@ -5357,14 +5299,6 @@ private Message GetStringSetAndGetMessage( _ => Message.Create(Database, flags, RedisCommand.DECRBY, key, -value), }; - private static RedisCommand SetOperationCommand(SetOperation operation, bool store) => operation switch - { - SetOperation.Difference => store ? RedisCommand.SDIFFSTORE : RedisCommand.SDIFF, - SetOperation.Intersect => store ? RedisCommand.SINTERSTORE : RedisCommand.SINTER, - SetOperation.Union => store ? RedisCommand.SUNIONSTORE : RedisCommand.SUNION, - _ => throw new ArgumentOutOfRangeException(nameof(operation)), - }; - private CursorEnumerable? TryScan(RedisKey key, RedisValue pattern, int pageSize, long cursor, int pageOffset, CommandFlags flags, RedisCommand command, ResultProcessor.ScanResult> processor, out ServerEndPoint? server, bool noValues = false) { server = null; @@ -5666,21 +5600,11 @@ private ScriptEvalMessage(int db, CommandFlags flags, RedisCommand command, stri if (keys == null) keys = Array.Empty(); if (values == null) values = Array.Empty(); - for (int i = 0; i < keys.Length; i++) - keys[i].AssertNotNull(); - this.keys = keys; - for (int i = 0; i < values.Length; i++) - values[i].AssertNotNull(); - this.values = values; + this.keys = keys.AssertAllNonNull(); + this.values = values.AssertAllNonNull(); } - public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) - { - int slot = ServerSelectionStrategy.NoSlot; - for (int i = 0; i < keys.Length; i++) - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); - return slot; - } + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(keys); public IEnumerable GetMessages(PhysicalConnection connection) { @@ -5820,20 +5744,14 @@ private sealed class SortedSetCombineAndStoreCommandMessage : Message.CommandKey public SortedSetCombineAndStoreCommandMessage(int db, CommandFlags flags, RedisCommand command, RedisKey destination, RedisKey[] keys, RedisValue[] values) : base(db, flags, command, destination) { - for (int i = 0; i < keys.Length; i++) - keys[i].AssertNotNull(); - this.keys = keys; - for (int i = 0; i < values.Length; i++) - values[i].AssertNotNull(); - this.values = values; + this.keys = keys.AssertAllNonNull(); + this.values = values.AssertAllNonNull(); } public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) { int slot = base.GetHashSlot(serverSelectionStrategy); - for (int i = 0; i < keys.Length; i++) - slot = serverSelectionStrategy.CombineSlot(slot, keys[i]); - return slot; + return serverSelectionStrategy.CombineSlot(slot, keys); } protected override void WriteImpl(PhysicalConnection physical) diff --git a/src/StackExchange.Redis/ServerSelectionStrategy.cs b/src/StackExchange.Redis/ServerSelectionStrategy.cs index 48ba32a77..e7917310e 100644 --- a/src/StackExchange.Redis/ServerSelectionStrategy.cs +++ b/src/StackExchange.Redis/ServerSelectionStrategy.cs @@ -280,6 +280,17 @@ internal int CombineSlot(int oldSlot, in RedisKey key) return oldSlot == newSlot ? oldSlot : MultipleSlots; } + internal int HashSlot(RedisKey[] keys) => CombineSlot(NoSlot, keys); + + internal int CombineSlot(int oldSlot, RedisKey[] keys) + { + for (int i = 0; i < keys.Length; i++) + { + oldSlot = CombineSlot(oldSlot, keys[i]); + } + return oldSlot; + } + internal int CountCoveredSlots() { var arr = map; diff --git a/src/StackExchange.Redis/SetOperationCardinalityMessage.cs b/src/StackExchange.Redis/SetOperationCardinalityMessage.cs new file mode 100644 index 000000000..2fbd153ea --- /dev/null +++ b/src/StackExchange.Redis/SetOperationCardinalityMessage.cs @@ -0,0 +1,31 @@ +namespace StackExchange.Redis; + +internal sealed class SetOperationCardinalityMessage( + int db, + CommandFlags flags, + RedisCommand command, + RedisKey[] keys, + long limit) : Message(db, flags, command) +{ + private readonly RedisKey[] _keys = keys.AssertAllNonNull(); + + public override int ArgCount => 1 + _keys.Length + (limit > 0 ? 2 : 0); + + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(_keys); + + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(Command, ArgCount); + physical.WriteBulkString(_keys.Length); + for (var i = 0; i < _keys.Length; i++) + { + physical.Write(_keys[i]); + } + + if (limit > 0) + { + physical.WriteRaw("$5\r\nLIMIT\r\n"u8); + physical.WriteBulkString(limit); + } + } +} diff --git a/src/StackExchange.Redis/SetOperationMessage.cs b/src/StackExchange.Redis/SetOperationMessage.cs new file mode 100644 index 000000000..b335cb5a8 --- /dev/null +++ b/src/StackExchange.Redis/SetOperationMessage.cs @@ -0,0 +1,92 @@ +using System; + +namespace StackExchange.Redis; + +internal sealed class SetOperationMessage : Message +{ + private readonly RedisKey[] _keys; + private readonly double[]? _weights; + private readonly Aggregate _aggregate; + private readonly bool _withScores; + + public SetOperationMessage( + int db, + CommandFlags flags, + SetOperation operation, + RedisKey[] keys, + double[]? weights, + Aggregate aggregate, + bool withScores) : base(db, flags, operation.ToSortedSetCommand()) + { + _keys = keys.AssertAllNonNull(); + if (operation == SetOperation.Difference && (weights != null || aggregate != Aggregate.Sum)) + { + throw new ArgumentException("ZDIFF cannot be used with weights or aggregation."); + } + if (weights != null && _keys.Length != weights.Length) + { + throw new ArgumentException("Keys and weights should have the same number of elements.", nameof(weights)); + } + + ValidateAggregate(aggregate); + _weights = weights; + _aggregate = aggregate; + _withScores = withScores; + } + + public override int ArgCount => + 1 + _keys.Length + + (_weights?.Length > 0 ? 1 + _weights.Length : 0) + + GetAggregateArgCount(_aggregate) + + (_withScores ? 1 : 0); + + public override int GetHashSlot(ServerSelectionStrategy serverSelectionStrategy) => serverSelectionStrategy.HashSlot(_keys); + + protected override void WriteImpl(PhysicalConnection physical) + { + physical.WriteHeader(Command, ArgCount); + physical.WriteBulkString(_keys.Length); + for (var i = 0; i < _keys.Length; i++) + { + physical.Write(_keys[i]); + } + + if (_weights?.Length > 0) + { + physical.WriteRaw("$7\r\nWEIGHTS\r\n"u8); + for (var i = 0; i < _weights.Length; i++) + { + physical.WriteBulkString(_weights[i]); + } + } + + switch (_aggregate) + { + case Aggregate.Sum: + break; + case Aggregate.Min: + physical.WriteRaw("$9\r\nAGGREGATE\r\n$3\r\nMIN\r\n"u8); + break; + case Aggregate.Max: + physical.WriteRaw("$9\r\nAGGREGATE\r\n$3\r\nMAX\r\n"u8); + break; + case Aggregate.Count: + physical.WriteRaw("$9\r\nAGGREGATE\r\n$5\r\nCOUNT\r\n"u8); + break; + } + + if (_withScores) + { + physical.WriteRaw("$10\r\nWITHSCORES\r\n"u8); + } + } + + private static void ValidateAggregate(Aggregate aggregate) => _ = GetAggregateArgCount(aggregate); + + private static int GetAggregateArgCount(Aggregate aggregate) => aggregate switch + { + Aggregate.Sum => 0, + Aggregate.Min or Aggregate.Max or Aggregate.Count => 2, + _ => throw new ArgumentOutOfRangeException(nameof(aggregate)), + }; +} diff --git a/src/StackExchange.Redis/StreamNackMessage.cs b/src/StackExchange.Redis/StreamNackMessage.cs index 991d7e76d..c562e150a 100644 --- a/src/StackExchange.Redis/StreamNackMessage.cs +++ b/src/StackExchange.Redis/StreamNackMessage.cs @@ -104,13 +104,7 @@ public StreamNackMessageMulti(int db, CommandFlags flags, in RedisKey key, in Re if (messageIds == null) throw new ArgumentNullException(nameof(messageIds)); #endif if (messageIds.Length == 0) throw new ArgumentOutOfRangeException(nameof(messageIds), "messageIds must contain at least one item."); - - for (int i = 0; i < messageIds.Length; i++) - { - messageIds[i].AssertNotNull(); - } - - this.messageIds = messageIds; + this.messageIds = messageIds.AssertAllNonNull(); } protected override int Count => messageIds.Length; diff --git a/tests/StackExchange.Redis.Tests/ClusterTests.cs b/tests/StackExchange.Redis.Tests/ClusterTests.cs index 781b65fef..80ff8a830 100644 --- a/tests/StackExchange.Redis.Tests/ClusterTests.cs +++ b/tests/StackExchange.Redis.Tests/ClusterTests.cs @@ -13,6 +13,16 @@ namespace StackExchange.Redis.Tests; [RunPerProtocol] public class ClusterTests(ITestOutputHelper output, SharedConnectionFixture fixture) : TestBase(output, fixture) { + private const int NoRedirectRoutingProbeCount = 10; + + public enum StreamConsumerGroupRoutingOperation + { + SetPosition, + ConsumerInfo, + DeleteConsumer, + DeleteConsumerGroup, + } + protected override string GetConfiguration() => TestConfig.Current.ClusterServersAndPorts + ",connectTimeout=10000"; [Fact] @@ -193,6 +203,155 @@ public async Task IntentionalWrongServer() } } + [Fact] + public async Task ClusterNoRedirectRoutesStreamCreateConsumerGroupByKey() + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + for (var i = 0; i < NoRedirectRoutingProbeCount; i++) + { + var tag = Guid.NewGuid().ToString("N"); + RedisKey key = $"{{{tag}}}:stream:create-group"; + RedisValue group = $"group-{i}"; + Log("Probe {0}: key={1}, slot={2}", i, key, conn.HashSlot(key)); + + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + + Assert.True(await db.StreamCreateConsumerGroupAsync( + key, + group, + StreamPosition.NewMessages, + createStream: true, + flags: CommandFlags.NoRedirect)); + } + } + + [Theory] + [InlineData(StreamConsumerGroupRoutingOperation.SetPosition)] + [InlineData(StreamConsumerGroupRoutingOperation.ConsumerInfo)] + [InlineData(StreamConsumerGroupRoutingOperation.DeleteConsumer)] + [InlineData(StreamConsumerGroupRoutingOperation.DeleteConsumerGroup)] + public async Task ClusterNoRedirectRoutesStreamConsumerGroupMetadataByKey(StreamConsumerGroupRoutingOperation operation) + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + for (var i = 0; i < NoRedirectRoutingProbeCount; i++) + { + var tag = Guid.NewGuid().ToString("N"); + RedisKey key = $"{{{tag}}}:stream:consumer-group-metadata"; + RedisValue group = $"group-{i}"; + RedisValue consumer = $"consumer-{i}"; + Log("Probe {0}: key={1}, slot={2}", i, key, conn.HashSlot(key)); + + await db.KeyDeleteAsync(key, CommandFlags.FireAndForget); + await db.StreamAddAsync(key, "field", "value", flags: CommandFlags.FireAndForget); + await db.StreamCreateConsumerGroupAsync(key, group, StreamPosition.Beginning, flags: CommandFlags.FireAndForget); + await db.StreamReadGroupAsync(key, group, consumer, StreamPosition.NewMessages, flags: CommandFlags.FireAndForget); + + switch (operation) + { + case StreamConsumerGroupRoutingOperation.SetPosition: + Assert.True(await db.StreamConsumerGroupSetPositionAsync(key, group, StreamPosition.Beginning, CommandFlags.NoRedirect)); + break; + case StreamConsumerGroupRoutingOperation.ConsumerInfo: + var consumers = await db.StreamConsumerInfoAsync(key, group, CommandFlags.NoRedirect); + Assert.Contains(consumers, consumerInfo => consumerInfo.Name == consumer); + break; + case StreamConsumerGroupRoutingOperation.DeleteConsumer: + Assert.Equal(1, await db.StreamDeleteConsumerAsync(key, group, consumer, CommandFlags.NoRedirect)); + break; + case StreamConsumerGroupRoutingOperation.DeleteConsumerGroup: + Assert.True(await db.StreamDeleteConsumerGroupAsync(key, group, CommandFlags.NoRedirect)); + break; + } + } + } + + [Fact] + public async Task ClusterNoRedirectRoutesSetIntersectionLengthByKeys() + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + for (var i = 0; i < NoRedirectRoutingProbeCount; i++) + { + var tag = Guid.NewGuid().ToString("N"); + RedisKey key1 = $"{{{tag}}}:set:1"; + RedisKey key2 = $"{{{tag}}}:set:2"; + Log("Probe {0}: key={1}, slot={2}", i, key1, conn.HashSlot(key1)); + Assert.Equal(conn.HashSlot(key1), conn.HashSlot(key2)); + + await db.KeyDeleteAsync([key1, key2], CommandFlags.FireAndForget); + await db.SetAddAsync(key1, ["shared", "key1-only"], CommandFlags.FireAndForget); + await db.SetAddAsync(key2, ["shared", "key2-only"], CommandFlags.FireAndForget); + + Assert.Equal(1, await db.SetIntersectionLengthAsync([key1, key2], flags: CommandFlags.NoRedirect)); + } + } + + [Theory] + [InlineData(SetOperation.Difference)] + [InlineData(SetOperation.Intersect)] + [InlineData(SetOperation.Union)] + public async Task ClusterNoRedirectRoutesSortedSetCombineByKeys(SetOperation operation) + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + for (var i = 0; i < NoRedirectRoutingProbeCount; i++) + { + var tag = Guid.NewGuid().ToString("N"); + RedisKey key1 = $"{{{tag}}}:zset:1"; + RedisKey key2 = $"{{{tag}}}:zset:2"; + Log("Probe {0}: key={1}, slot={2}", i, key1, conn.HashSlot(key1)); + Assert.Equal(conn.HashSlot(key1), conn.HashSlot(key2)); + + await db.KeyDeleteAsync([key1, key2], CommandFlags.FireAndForget); + await db.SortedSetAddAsync(key1, [new("shared", 1), new("key1-only", 2)], CommandFlags.FireAndForget); + await db.SortedSetAddAsync(key2, [new("shared", 1), new("key2-only", 3)], CommandFlags.FireAndForget); + + var result = await db.SortedSetCombineAsync(operation, [key1, key2], flags: CommandFlags.NoRedirect); + switch (operation) + { + case SetOperation.Difference: + Assert.Equal(["key1-only"], result); + break; + case SetOperation.Intersect: + Assert.Equal(["shared"], result); + break; + case SetOperation.Union: + Assert.Equal(3, result.Length); + Assert.Contains((RedisValue)"shared", result); + Assert.Contains((RedisValue)"key1-only", result); + Assert.Contains((RedisValue)"key2-only", result); + break; + } + } + } + + [Fact] + public async Task ClusterNoRedirectRoutesSortedSetIntersectionLengthByKeys() + { + await using var conn = Create(require: RedisFeatures.v7_0_0_rc1); + + var db = conn.GetDatabase(); + for (var i = 0; i < NoRedirectRoutingProbeCount; i++) + { + var tag = Guid.NewGuid().ToString("N"); + RedisKey key1 = $"{{{tag}}}:zset:1"; + RedisKey key2 = $"{{{tag}}}:zset:2"; + Log("Probe {0}: key={1}, slot={2}", i, key1, conn.HashSlot(key1)); + Assert.Equal(conn.HashSlot(key1), conn.HashSlot(key2)); + + await db.KeyDeleteAsync([key1, key2], CommandFlags.FireAndForget); + await db.SortedSetAddAsync(key1, [new("shared", 1), new("key1-only", 2)], CommandFlags.FireAndForget); + await db.SortedSetAddAsync(key2, [new("shared", 1), new("key2-only", 3)], CommandFlags.FireAndForget); + Assert.Equal(1, await db.SortedSetIntersectionLengthAsync([key1, key2], flags: CommandFlags.NoRedirect)); + } + } + [Fact] public async Task TransactionWithMultiServerKeys() {