diff --git a/Directory.Build.props b/Directory.Build.props
index a136f3c66..13d503df2 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -26,6 +26,7 @@
true
false
true
+ true
00240000048000009400000006020000002400005253413100040000010001007791a689e9d8950b44a9a8886baad2ea180e7a8a854f158c9b98345ca5009cdd2362c84f368f1c3658c132b3c0f74e44ff16aeb2e5b353b6e0fe02f923a050470caeac2bde47a2238a9c7125ed7dab14f486a5a64558df96640933b9f2b6db188fc4a820f96dce963b662fa8864adbff38e5b4542343f162ecdc6dad16912fff
diff --git a/Directory.Packages.props b/Directory.Packages.props
index 4f65bb324..244e3a271 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -1,43 +1,39 @@
-
-
-
+
+
-
+
-
-
+
+
+
-
-
+
-
-
+
-
-
+
+
-
-
-
-
-
+
+
+
+
+
-
+
-
-
-
+
+
-
-
-
-
-
+
+
+
+
\ No newline at end of file
diff --git a/StackExchange.Redis.sln.DotSettings b/StackExchange.Redis.sln.DotSettings
index 8dd9095d9..339a73c59 100644
--- a/StackExchange.Redis.sln.DotSettings
+++ b/StackExchange.Redis.sln.DotSettings
@@ -26,4 +26,5 @@
True
True
True
- True
\ No newline at end of file
+ True
+ True
\ No newline at end of file
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 96e4b5bae..6062a56db 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -72,11 +72,11 @@ var conn = ConnectionMultiplexer.Connect("contoso5.redis.cache.windows.net,ssl=t
The `ConfigurationOptions` object has a wide range of properties, all of which are fully documented in intellisense. Some of the more common options to use include:
| Configuration string | `ConfigurationOptions` | Default | Meaning |
-| ---------------------- | ---------------------- | ---------------------------- | --------------------------------------------------------------------------------------------------------- |
+| ---------------------- | ---------------------- |------------------------------| --------------------------------------------------------------------------------------------------------- |
| abortConnect={bool} | `AbortOnConnectFail` | `true` (`false` on Azure) | If true, `Connect` will not create a connection while no servers are available |
| allowAdmin={bool} | `AllowAdmin` | `false` | Enables a range of commands that are considered risky |
| channelPrefix={string} | `ChannelPrefix` | `null` | Optional channel prefix for all pub/sub operations |
-| checkCertificateRevocation={bool} | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. |
+| checkCertificateRevocation={bool} | `CheckCertificateRevocation` | `true` | A Boolean value that specifies whether the certificate revocation list is checked during authentication. |
| connectRetry={int} | `ConnectRetry` | `3` | The number of times to repeat connect attempts during initial `Connect` |
| connectTimeout={int} | `ConnectTimeout` | `5000` | Timeout (ms) for connect operations |
| configChannel={string} | `ConfigurationChannel` | `__Booksleeve_MasterChanged` | Broadcast channel name for communicating configuration changes |
@@ -95,7 +95,7 @@ The `ConfigurationOptions` object has a wide range of properties, all of which a
| syncTimeout={int} | `SyncTimeout` | `5000` | Time (ms) to allow for synchronous operations |
| asyncTimeout={int} | `AsyncTimeout` | `SyncTimeout` | Time (ms) to allow for asynchronous operations |
| tiebreaker={string} | `TieBreaker` | `__Booksleeve_TieBreak` | Key to use for selecting a server in an ambiguous primary scenario |
-| version={string} | `DefaultVersion` | (`4.0` in Azure, else `2.0`) | Redis version level (useful when the server does not make this available) |
+| version={string} | `DefaultVersion` | (`7.4` in AMR, else `6.0`) | Redis version level (useful when the server does not make this available) |
| tunnel={string} | `Tunnel` | `null` | Tunnel for connections (use `http:{proxy url}` for "connect"-based proxy server) |
| setlib={bool} | `SetClientLibrary` | `true` | Whether to attempt to use `CLIENT SETINFO` to set the library name/version on the connection |
| protocol={string} | `Protocol` | `null` | Redis protocol to use; see section below |
diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md
index c7d16f61d..7b3f1671e 100644
--- a/docs/ReleaseNotes.md
+++ b/docs/ReleaseNotes.md
@@ -6,7 +6,14 @@ Current package versions:
| ------------ | ----------------- | ----- |
| [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.nuget.org/packages/StackExchange.Redis/) | [](https://www.myget.org/feed/stackoverflow/package/nuget/StackExchange.Redis) |
-## Unreleased
+## 3.0
+
+From 3.0, [release notes will be maintained in GitHub only](https://github.com/StackExchange/StackExchange.Redis/releases) to avoid duplication.
+
+---
+
+
+## 2.12.14
- (none)
diff --git a/docs/exp/SER004.md b/docs/exp/SER004.md
new file mode 100644
index 000000000..91f5d87c4
--- /dev/null
+++ b/docs/exp/SER004.md
@@ -0,0 +1,15 @@
+# RESPite
+
+RESPite is an experimental library that provides high-performance low-level RESP (Redis, etc) parsing and serialization.
+It is used as the IO core for StackExchange.Redis v3+. You should not (yet) use it directly unless you have a very
+good reason to do so.
+
+```xml
+$(NoWarn);SER004
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER004
+```
diff --git a/docs/exp/SER005.md b/docs/exp/SER005.md
new file mode 100644
index 000000000..03e7b7bb4
--- /dev/null
+++ b/docs/exp/SER005.md
@@ -0,0 +1,21 @@
+# Unit Testing
+
+Unit testing is great! Yay, do more of that!
+
+This type is provided for external unit testing, in particular by people using modules or server features
+not directly implemented by SE.Redis - for example to verify messsage parsing or formatting without
+talking to a RESP server.
+
+These types are considered slightly more... *mercurial*. We encourage you to use them, but *occasionally*
+(not just for fun) you might need to update your test code if we tweak something. This should not impact
+"real" library usage.
+
+```xml
+$(NoWarn);SER005
+```
+
+or more granularly / locally in C#:
+
+``` c#
+#pragma warning disable SER005
+```
diff --git a/src/RESPite/Messages/RespReader.cs b/src/RESPite/Messages/RespReader.cs
index 2c93185ea..49fd09aa7 100644
--- a/src/RESPite/Messages/RespReader.cs
+++ b/src/RESPite/Messages/RespReader.cs
@@ -746,7 +746,6 @@ private readonly unsafe bool TryParseSlow(
/// The parsed value if successful.
/// true if parsing succeeded; otherwise, false.
#pragma warning disable RS0016, RS0027 // public API
- [Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
[MethodImpl(MethodImplOptions.AggressiveInlining)]
#if DEBUG
[Obsolete("Please prefer the function-pointer API for library-internal use.")]
@@ -758,6 +757,52 @@ public readonly bool TryParseScalar(ScalarParser parser, out T value
return TryGetSpan(out var span) ? parser(span, out value) : TryParseSlow(parser, out value);
}
+ private readonly ReadOnlySpan BufferChars(Span target, out char[]? lease)
+ {
+ byte[] byteLease = [];
+ var bytes = Buffer(ref byteLease, byteLease);
+
+ int len = RespConstants.UTF8.GetMaxCharCount(bytes.Length);
+ if (len <= target.Length)
+ {
+ lease = null;
+ }
+ else
+ {
+ target = lease = ArrayPool.Shared.Rent(len);
+ }
+ len = RespConstants.UTF8.GetChars(bytes, target);
+ return target.Slice(0, len);
+ }
+
+ ///
+ /// Tries to read the current scalar element using a parser callback.
+ ///
+ /// The type of data being parsed.
+ /// The parser callback.
+ /// The parsed value if successful.
+ /// true if parsing succeeded; otherwise, false.
+ public readonly bool TryParseScalar(ScalarParser parser, out T value)
+ {
+ // note: no benefit in a function-ptr overload, after we've dealt with decoding bytes etc
+ var buffer = BufferChars(stackalloc char[128], out var lease);
+ try
+ {
+ return parser(buffer, out value);
+ }
+ finally
+ {
+ if (lease is not null) ArrayPool.Shared.Return(lease);
+ }
+ }
+
+ ///
+ /// Tries to read the current scalar element using a parser callback.
+ ///
+ /// The type of data being parsed.
+ /// The parser callback.
+ /// The parsed value if successful.
+ /// true if parsing succeeded; otherwise, false.
[MethodImpl(MethodImplOptions.NoInlining)]
private readonly bool TryParseSlow(ScalarParser parser, out T value)
{
diff --git a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
index ab058de62..9acf1fc40 100644
--- a/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
+++ b/src/RESPite/PublicAPI/PublicAPI.Unshipped.txt
@@ -1 +1,6 @@
#nullable enable
+[SER004]RESPite.Messages.RespReader.TryParseScalar(RESPite.Messages.RespReader.ScalarParser! parser, out T value) -> bool
+[SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.EqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
+[SER004]static RESPite.AsciiHash.SequenceEqualsCI(System.ReadOnlySpan first, System.ReadOnlySpan second) -> bool
diff --git a/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt b/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt
new file mode 100644
index 000000000..ab058de62
--- /dev/null
+++ b/src/RESPite/PublicAPI/net6.0/PublicAPI.Shipped.txt
@@ -0,0 +1 @@
+#nullable enable
diff --git a/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt b/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt
new file mode 100644
index 000000000..ab058de62
--- /dev/null
+++ b/src/RESPite/PublicAPI/net6.0/PublicAPI.Unshipped.txt
@@ -0,0 +1 @@
+#nullable enable
diff --git a/src/RESPite/RESPite.csproj b/src/RESPite/RESPite.csproj
index fef03625b..6b9798b0e 100644
--- a/src/RESPite/RESPite.csproj
+++ b/src/RESPite/RESPite.csproj
@@ -46,6 +46,4 @@
-
-
diff --git a/src/RESPite/Shared/AsciiHash.Public.cs b/src/RESPite/Shared/AsciiHash.Public.cs
deleted file mode 100644
index dd31cb415..000000000
--- a/src/RESPite/Shared/AsciiHash.Public.cs
+++ /dev/null
@@ -1,10 +0,0 @@
-namespace RESPite;
-
-// in the shared file, these are declared without accessibility modifiers
-public sealed partial class AsciiHashAttribute
-{
-}
-
-public readonly partial struct AsciiHash
-{
-}
diff --git a/src/RESPite/Shared/AsciiHash.cs b/src/RESPite/Shared/AsciiHash.cs
index 37b3c5734..f0f134872 100644
--- a/src/RESPite/Shared/AsciiHash.cs
+++ b/src/RESPite/Shared/AsciiHash.cs
@@ -1,4 +1,3 @@
-using System;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@@ -7,8 +6,6 @@
namespace RESPite;
-#pragma warning disable SA1205 // deliberately omit accessibility - see AsciiHash.Public.cs
-
///
/// This type is intended to provide fast hashing functions for small ASCII strings, for example well-known
/// RESP literals that are usually identifiable by their length and initial bytes; it is not intended
@@ -22,7 +19,7 @@ namespace RESPite;
Inherited = false)]
[Conditional("DEBUG")] // evaporate in release
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
-sealed partial class AsciiHashAttribute(string token = "") : Attribute
+public sealed partial class AsciiHashAttribute(string token = "") : Attribute
{
///
/// The token expected when parsing data, if different from the implied value. The implied
@@ -38,7 +35,7 @@ sealed partial class AsciiHashAttribute(string token = "") : Attribute
// note: instance members are in AsciiHash.Instance.cs.
[Experimental(Experiments.Respite, UrlFormat = Experiments.UrlFormat)]
-readonly partial struct AsciiHash
+public readonly partial struct AsciiHash
{
///
/// In-place ASCII upper-case conversion.
@@ -85,6 +82,9 @@ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
}
+ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ => EqualsCI(second, first);
+
public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -120,6 +120,9 @@ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpa
}
}
+ public static bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ => SequenceEqualsCI(second, first);
+
public static bool EqualsCS(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -139,6 +142,14 @@ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
}
+ public static bool EqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ {
+ var len = first.Length;
+ if (len != second.Length) return false;
+ // for very short values, the UC hash performs CI equality
+ return len <= MaxBytesHashed ? HashUC(first) == HashUC(second) : SequenceEqualsCI(first, second);
+ }
+
public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
{
var len = first.Length;
@@ -174,6 +185,41 @@ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpa
}
}
+ public static unsafe bool SequenceEqualsCI(ReadOnlySpan first, ReadOnlySpan second)
+ {
+ var len = first.Length;
+ if (len != second.Length) return false;
+
+ // OK, don't be clever (SIMD, etc); the purpose of FashHash is to compare RESP key tokens, which are
+ // typically relatively short, think 3-20 bytes. That wouldn't even touch a SIMD vector, so:
+ // just loop (the exact thing we'd need to do *anyway* in a SIMD implementation, to mop up the non-SIMD
+ // trailing bytes).
+ fixed (char* firstPtr = &MemoryMarshal.GetReference(first))
+ {
+ fixed (byte* secondPtr = &MemoryMarshal.GetReference(second))
+ {
+ const int CS_MASK = 0b0101_1111;
+ for (int i = 0; i < len; i++)
+ {
+ int x = (byte)firstPtr[i];
+ var xCI = x & CS_MASK;
+ if (xCI >= 'A' & xCI <= 'Z')
+ {
+ // alpha mismatch
+ if (xCI != (secondPtr[i] & CS_MASK)) return false;
+ }
+ else if (x != secondPtr[i])
+ {
+ // non-alpha mismatch
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+ }
+
public static void Hash(scoped ReadOnlySpan value, out long cs, out long uc)
{
cs = HashCS(value);
diff --git a/src/RESPite/Shared/FrameworkShims.Encoding.cs b/src/RESPite/Shared/FrameworkShims.Encoding.cs
index 2f2c2e89d..b5937dd17 100644
--- a/src/RESPite/Shared/FrameworkShims.Encoding.cs
+++ b/src/RESPite/Shared/FrameworkShims.Encoding.cs
@@ -1,3 +1,5 @@
+using System.Runtime.InteropServices;
+
#if !NET
// ReSharper disable once CheckNamespace
namespace System.Text
@@ -7,7 +9,7 @@ internal static class EncodingExtensions
public static unsafe int GetBytes(this Encoding encoding, ReadOnlySpan source, Span destination)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = destination)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(destination))
{
fixed (char* cPtr = source)
{
@@ -19,9 +21,9 @@ public static unsafe int GetBytes(this Encoding encoding, ReadOnlySpan sou
public static unsafe int GetChars(this Encoding encoding, ReadOnlySpan source, Span destination)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
- fixed (char* cPtr = destination)
+ fixed (char* cPtr = &MemoryMarshal.GetReference(destination))
{
return encoding.GetChars(bPtr, source.Length, cPtr, destination.Length);
}
@@ -31,7 +33,7 @@ public static unsafe int GetChars(this Encoding encoding, ReadOnlySpan sou
public static unsafe int GetCharCount(this Encoding encoding, ReadOnlySpan source)
{
if (source.IsEmpty) return 0;
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
return encoding.GetCharCount(bPtr, source.Length);
}
@@ -40,7 +42,7 @@ public static unsafe int GetCharCount(this Encoding encoding, ReadOnlySpan
public static unsafe string GetString(this Encoding encoding, ReadOnlySpan source)
{
if (source.IsEmpty) return "";
- fixed (byte* bPtr = source)
+ fixed (byte* bPtr = &MemoryMarshal.GetReference(source))
{
return encoding.GetString(bPtr, source.Length);
}
diff --git a/src/RESPite/Shared/FrameworkShims.Sockets.cs b/src/RESPite/Shared/FrameworkShims.Sockets.cs
new file mode 100644
index 000000000..25eccd0a2
--- /dev/null
+++ b/src/RESPite/Shared/FrameworkShims.Sockets.cs
@@ -0,0 +1,163 @@
+#if !NET
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// ReSharper disable once CheckNamespace
+namespace System.Net.Sockets;
+
+internal static class SocketExtensions
+{
+ internal static async ValueTask ConnectAsync(this Socket socket, EndPoint remoteEP, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(SocketFlags.None, cancellationToken);
+ args.RemoteEndPoint = remoteEP;
+ if (!socket.ConnectAsync(args))
+ {
+ args.Complete();
+ }
+ await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ internal static async ValueTask SendAsync(this Socket socket, ReadOnlyMemory buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
+ args.SetBuffer(buffer);
+ if (!socket.SendAsync(args))
+ {
+ args.Complete();
+ }
+
+ return await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ internal static async ValueTask ReceiveAsync(this Socket socket, Memory buffer, SocketFlags socketFlags, CancellationToken cancellationToken = default)
+ {
+ // this API is only used during handshake, *not* core IO, so: we're not concerned about alloc overhead
+ using var args = new SocketAwaitableEventArgs(socketFlags, cancellationToken);
+ args.SetBuffer(buffer);
+ if (!socket.ReceiveAsync(args))
+ {
+ args.Complete();
+ }
+
+ return await args; // .ConfigureAwait(false) does not apply here
+ }
+
+ ///
+ /// Awaitable SocketAsyncEventArgs, where awaiting the args yields either the BytesTransferred or throws the relevant socket exception,
+ /// plus support for cancellation via .
+ ///
+ private sealed class SocketAwaitableEventArgs : SocketAsyncEventArgs, ICriticalNotifyCompletion, IDisposable
+ {
+ public new void Dispose()
+ {
+ cancelRegistration.Dispose();
+ base.Dispose();
+ }
+
+ private CancellationTokenRegistration cancelRegistration;
+ public SocketAwaitableEventArgs(SocketFlags socketFlags, CancellationToken cancellationToken)
+ {
+ SocketFlags = socketFlags;
+ if (cancellationToken.CanBeCanceled)
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ cancelRegistration = cancellationToken.Register(Timeout);
+ }
+ }
+
+ public void SetBuffer(ReadOnlyMemory buffer)
+ {
+ if (!MemoryMarshal.TryGetArray(buffer, out var segment)) ThrowNotSupported();
+ SetBuffer(segment.Array ?? [], segment.Offset, segment.Count);
+
+ [DoesNotReturn]
+ static void ThrowNotSupported() => throw new NotSupportedException("Only array-backed buffers are supported");
+ }
+
+ public void Timeout() => Abort(SocketError.TimedOut);
+
+ public void Abort(SocketError error)
+ {
+ _forcedError = error;
+ OnCompleted(this);
+ }
+
+ private volatile SocketError _forcedError; // Success = 0, no field init required
+
+ // ReSharper disable once InconsistentNaming
+ private static readonly Action _callbackCompleted = () => { };
+
+ private Action? _callback;
+
+ public SocketAwaitableEventArgs GetAwaiter() => this;
+
+ ///
+ /// Indicates whether the current operation is complete; used as part of "await".
+ ///
+ public bool IsCompleted => ReferenceEquals(_callback, _callbackCompleted);
+
+ ///
+ /// Gets the result of the async operation is complete; used as part of "await".
+ ///
+ public int GetResult()
+ {
+ Debug.Assert(ReferenceEquals(_callback, _callbackCompleted));
+
+ _callback = null;
+
+ var error = _forcedError;
+ if (error is SocketError.Success) error = SocketError;
+ if (error is not SocketError.Success) ThrowSocketException(error);
+
+ return BytesTransferred;
+
+ static void ThrowSocketException(SocketError e) => throw new SocketException((int)e);
+ }
+
+ ///
+ /// Schedules a continuation for this operation; used as part of "await".
+ ///
+ public void OnCompleted(Action continuation)
+ {
+ if (ReferenceEquals(Volatile.Read(ref _callback), _callbackCompleted)
+ || ReferenceEquals(Interlocked.CompareExchange(ref _callback, continuation, null), _callbackCompleted))
+ {
+ // this is the rare "kinda already complete" case; push to worker to prevent possible stack dive,
+ // but prefer the custom scheduler when possible
+ RunOnThreadPool(continuation);
+ }
+ }
+
+ ///
+ /// Schedules a continuation for this operation; used as part of "await".
+ ///
+ public void UnsafeOnCompleted(Action continuation) => OnCompleted(continuation);
+
+ ///
+ /// Marks the operation as complete - this should be invoked whenever a SocketAsyncEventArgs operation returns false.
+ ///
+ public void Complete() => OnCompleted(this);
+
+ private static void RunOnThreadPool(Action action)
+ => ThreadPool.QueueUserWorkItem(static state => ((Action)state).Invoke(), action);
+
+ ///
+ /// Invoked automatically when an operation completes asynchronously.
+ ///
+ protected override void OnCompleted(SocketAsyncEventArgs e)
+ {
+ var continuation = Interlocked.Exchange(ref _callback, _callbackCompleted);
+ if (continuation is not null)
+ {
+ // continue on the thread-pool
+ RunOnThreadPool(continuation);
+ }
+ }
+ }
+}
+#endif
diff --git a/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs b/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
index 003708e6a..0daa00377 100644
--- a/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
+++ b/src/StackExchange.Redis/APITypes/LatencyHistoryEntry.cs
@@ -1,4 +1,5 @@
using System;
+using RESPite.Messages;
namespace StackExchange.Redis;
@@ -11,18 +12,14 @@ public readonly struct LatencyHistoryEntry
private sealed class Processor : ArrayResultProcessor
{
- protected override bool TryParse(in RawResult raw, out LatencyHistoryEntry parsed)
+ protected override bool TryParse(ref RespReader reader, out LatencyHistoryEntry parsed)
{
- if (raw.Resp2TypeArray == ResultType.Array)
+ if (reader.IsAggregate
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration))
{
- var items = raw.GetItems();
- if (items.Length >= 2
- && items[0].TryGetInt64(out var timestamp)
- && items[1].TryGetInt64(out var duration))
- {
- parsed = new LatencyHistoryEntry(timestamp, duration);
- return true;
- }
+ parsed = new LatencyHistoryEntry(timestamp, duration);
+ return true;
}
parsed = default;
return false;
diff --git a/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs b/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
index d1bc70e42..c1f012495 100644
--- a/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
+++ b/src/StackExchange.Redis/APITypes/LatencyLatestEntry.cs
@@ -1,4 +1,5 @@
using System;
+using RESPite.Messages;
namespace StackExchange.Redis;
@@ -11,17 +12,17 @@ public readonly struct LatencyLatestEntry
private sealed class Processor : ArrayResultProcessor
{
- protected override bool TryParse(in RawResult raw, out LatencyLatestEntry parsed)
+ protected override bool TryParse(ref RespReader reader, out LatencyLatestEntry parsed)
{
- if (raw.Resp2TypeArray == ResultType.Array)
+ if (reader.IsAggregate && reader.TryMoveNext() && reader.IsScalar)
{
- var items = raw.GetItems();
- if (items.Length >= 4
- && items[1].TryGetInt64(out var timestamp)
- && items[2].TryGetInt64(out var duration)
- && items[3].TryGetInt64(out var maxDuration))
+ var eventName = reader.ReadString()!;
+
+ if (reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var timestamp)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var duration)
+ && reader.TryMoveNext() && reader.IsScalar && reader.TryReadInt64(out var maxDuration))
{
- parsed = new LatencyLatestEntry(items[0].GetString()!, timestamp, duration, maxDuration);
+ parsed = new LatencyLatestEntry(eventName, timestamp, duration, maxDuration);
return true;
}
}
diff --git a/src/StackExchange.Redis/ArrayGrepRequest.cs b/src/StackExchange.Redis/ArrayGrepRequest.cs
index f3cbc0109..f3142f168 100644
--- a/src/StackExchange.Redis/ArrayGrepRequest.cs
+++ b/src/StackExchange.Redis/ArrayGrepRequest.cs
@@ -226,7 +226,7 @@ internal Message CreateMessage(int db, RedisKey key, CommandFlags flags)
public abstract class Predicate
{
internal virtual int ArgCount => 2;
- internal abstract void WriteTo(PhysicalConnection physical);
+ internal abstract void WriteTo(in MessageWriter writer);
private protected Predicate() { }
///
@@ -261,10 +261,10 @@ private sealed class ExactPredicate(RedisValue value) : Predicate
{
public override string ToString() => $"EXACT '{value}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$5\r\nEXACT\r\n"u8);
- physical.WriteBulkString(value);
+ writer.WriteRaw("$5\r\nEXACT\r\n"u8);
+ writer.WriteBulkString(value);
}
}
@@ -272,10 +272,10 @@ private sealed class MatchPredicate(string pattern) : Predicate
{
public override string ToString() => $"MATCH '{pattern}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$5\r\nMATCH\r\n"u8);
- physical.WriteBulkString(pattern);
+ writer.WriteRaw("$5\r\nMATCH\r\n"u8);
+ writer.WriteBulkString(pattern);
}
}
@@ -283,10 +283,10 @@ private sealed class GlobPredicate(string pattern) : Predicate
{
public override string ToString() => $"GLOB '{pattern}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$4\r\nGLOB\r\n"u8);
- physical.WriteBulkString(pattern);
+ writer.WriteRaw("$4\r\nGLOB\r\n"u8);
+ writer.WriteBulkString(pattern);
}
}
@@ -294,10 +294,10 @@ private sealed class RegexPredicate(string re) : Predicate
{
public override string ToString() => $"RE '{re}'";
- internal override void WriteTo(PhysicalConnection physical)
+ internal override void WriteTo(in MessageWriter writer)
{
- physical.WriteRaw("$2\r\nRE\r\n"u8);
- physical.WriteBulkString(re);
+ writer.WriteRaw("$2\r\nRE\r\n"u8);
+ writer.WriteBulkString(re);
}
}
}
@@ -325,42 +325,42 @@ public override int ArgCount
}
}
- protected override void WriteImpl(PhysicalConnection physical)
+ protected override void WriteImpl(in MessageWriter writer)
{
- physical.WriteHeader(Command, ArgCount);
- physical.WriteBulkString(key);
+ writer.WriteHeader(Command, ArgCount);
+ writer.WriteBulkString(key);
var index = request.Start;
if (index.HasValue)
{
- physical.WriteBulkString(index.GetValueOrDefault().Value);
+ writer.WriteBulkString(index.GetValueOrDefault().Value);
}
else
{
- physical.WriteRaw("$1\r\n-\r\n"u8);
+ writer.WriteRaw("$1\r\n-\r\n"u8);
}
index = request.End;
if (index.HasValue)
{
- physical.WriteBulkString(index.GetValueOrDefault().Value);
+ writer.WriteBulkString(index.GetValueOrDefault().Value);
}
else
{
- physical.WriteRaw("$1\r\n+\r\n"u8);
+ writer.WriteRaw("$1\r\n+\r\n"u8);
}
var pCount = request.Count;
for (int i = 0; i < pCount; i++)
{
- request[i].WriteTo(physical);
+ request[i].WriteTo(in writer);
}
- if (request.IsIntersection) physical.WriteRaw("$3\r\nAND\r\n"u8);
- if (request.IsCaseSensitive) physical.WriteRaw("$6\r\nNOCASE\r\n"u8);
- if (request.IncludeValues) physical.WriteRaw("$10\r\nWITHVALUES\r\n"u8);
+ if (request.IsIntersection) writer.WriteRaw("$3\r\nAND\r\n"u8);
+ if (request.IsCaseSensitive) writer.WriteRaw("$6\r\nNOCASE\r\n"u8);
+ if (request.IncludeValues) writer.WriteRaw("$10\r\nWITHVALUES\r\n"u8);
var limit = request.Limit;
if (limit.HasValue)
{
- physical.WriteRaw("$5\r\nLIMIT\r\n"u8);
- physical.WriteBulkString(limit.GetValueOrDefault());
+ writer.WriteRaw("$5\r\nLIMIT\r\n"u8);
+ writer.WriteBulkString(limit.GetValueOrDefault());
}
}
}
diff --git a/src/StackExchange.Redis/AwaitableMutex.cs b/src/StackExchange.Redis/AwaitableMutex.cs
new file mode 100644
index 000000000..9bc054804
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace StackExchange.Redis;
+
+// abstract over the concept of awaiable mutex between platforms
+internal readonly partial struct AwaitableMutex : IDisposable
+{
+ // ReSharper disable once ConvertToAutoProperty
+ public partial int TimeoutMilliseconds { get; }
+ public static AwaitableMutex Create(int timeoutMilliseconds) => new(timeoutMilliseconds);
+
+ // define the API first here (think .h file)
+ private partial AwaitableMutex(int timeoutMilliseconds);
+ public partial void Dispose();
+ public partial bool IsAvailable { get; }
+ public partial bool TryTakeInstant();
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken = default);
+ public partial bool TryTakeSync();
+ public partial void Release();
+}
diff --git a/src/StackExchange.Redis/AwaitableMutex.net.cs b/src/StackExchange.Redis/AwaitableMutex.net.cs
new file mode 100644
index 000000000..501b6af36
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.net.cs
@@ -0,0 +1,36 @@
+using System.Threading;
+using System.Threading.Tasks;
+
+#if NET
+namespace StackExchange.Redis;
+
+internal partial struct AwaitableMutex
+{
+ private readonly int _timeoutMilliseconds;
+
+ // note: this does not guarantee "fairness", but that's OK for our use-case - we mostly just want
+ // a sync+async awaitable mutex, which this does; the .NET Framework version has a hand-written
+ // implementation (see .netfx.cx for reasons), which *is* fair, but we'd rather not pay that overhead
+ // here. Good-enough-is.
+ private readonly SemaphoreSlim _mutex;
+
+ private partial AwaitableMutex(int timeoutMilliseconds)
+ {
+ _timeoutMilliseconds = timeoutMilliseconds;
+ _mutex = new(1, 1);
+ }
+
+ public partial void Dispose() => _mutex?.Dispose();
+ public partial bool IsAvailable => _mutex.CurrentCount != 0;
+ public partial int TimeoutMilliseconds => _timeoutMilliseconds;
+
+ public partial bool TryTakeInstant() => _mutex.Wait(0);
+
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ => new(_mutex.WaitAsync(_timeoutMilliseconds, cancellationToken));
+
+ public partial bool TryTakeSync() => _mutex.Wait(_timeoutMilliseconds);
+
+ public partial void Release() => _mutex.Release();
+}
+#endif
diff --git a/src/StackExchange.Redis/AwaitableMutex.netfx.cs b/src/StackExchange.Redis/AwaitableMutex.netfx.cs
new file mode 100644
index 000000000..7267f8043
--- /dev/null
+++ b/src/StackExchange.Redis/AwaitableMutex.netfx.cs
@@ -0,0 +1,389 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+#if !NET
+namespace StackExchange.Redis;
+
+/*
+Compensating for the fact that netfx SemaphoreSlim is kinda janky (https://blog.marcgravell.com/2019/02/fun-with-spiral-of-death.html).
+
+This uses a simple queue of sync/async callers, and assumes a reasonable caller (the original MutexSlim is more defensive, as
+a general purpose public API).
+*/
+
+internal partial struct AwaitableMutex
+{
+ private readonly State _state;
+
+ private partial AwaitableMutex(int timeoutMilliseconds)
+ {
+ _state = new(timeoutMilliseconds);
+ }
+
+ public partial void Dispose() => _state?.Dispose();
+
+ public partial bool IsAvailable => _state.IsAvailable;
+ public partial int TimeoutMilliseconds => _state.TimeoutMilliseconds;
+
+ public partial bool TryTakeInstant() => _state.TryTakeInstant();
+
+ public partial ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ => _state.TryTakeAsync(cancellationToken);
+
+ public partial bool TryTakeSync() => _state.TryTakeSync();
+
+ public partial void Release() => _state.Release();
+
+ private sealed class State : IDisposable
+ {
+ private readonly Queue _queue = new();
+ private bool _isHeld;
+ private bool _isDisposed;
+
+ public State(int timeoutMilliseconds)
+ {
+ if (timeoutMilliseconds < Timeout.Infinite) ThrowOutOfRangeException();
+ TimeoutMilliseconds = timeoutMilliseconds;
+
+ static void ThrowOutOfRangeException() => throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
+ }
+
+ public int TimeoutMilliseconds { get; }
+
+ public bool IsAvailable
+ {
+ get
+ {
+ lock (_queue)
+ {
+ return !_isDisposed && !_isHeld && _queue.Count == 0;
+ }
+ }
+ }
+
+ public bool TryTakeInstant()
+ {
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (!lockTaken) return false;
+
+ return TryTakeInsideLock();
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+ }
+
+ public bool TryTakeSync()
+ {
+ bool lockTaken = false;
+ try
+ {
+ // try to acquire uncontested lock - that way we can avoid checking the time
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (lockTaken && TryTakeInsideLock()) return true;
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return TryTakeSyncSlow();
+ }
+
+ public ValueTask TryTakeAsync(CancellationToken cancellationToken)
+ {
+ bool lockTaken = false;
+ try
+ {
+ // try to acquire uncontested lock - that way we can avoid allocating the pending caller
+ Monitor.TryEnter(_queue, 0, ref lockTaken);
+ if (lockTaken)
+ {
+ if (_isDisposed) return DisposedAsync();
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return CanceledAsync(cancellationToken);
+ }
+
+ if (TryTakeInsideLockCore()) return new ValueTask(true);
+ }
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return TryTakeAsyncSlow(cancellationToken);
+ }
+
+ private ValueTask TryTakeAsyncSlow(CancellationToken cancellationToken)
+ {
+ lock (_queue)
+ {
+ if (_isDisposed) return DisposedAsync();
+ if (cancellationToken.IsCancellationRequested)
+ {
+ return CanceledAsync(cancellationToken);
+ }
+
+ if (TryTakeInsideLockCore()) return new ValueTask(true);
+ if (TimeoutMilliseconds == 0) return new ValueTask(false);
+ if (cancellationToken.IsCancellationRequested) return CanceledAsync(cancellationToken);
+
+ var pending = new AsyncPendingCaller(TimeoutMilliseconds, cancellationToken);
+ _queue.Enqueue(pending);
+ return new ValueTask(pending.Task);
+ }
+ }
+
+ public void Release()
+ {
+ lock (_queue)
+ {
+ ThrowIfDisposed();
+ if (!_isHeld) ThrowNotHeld();
+
+ while (_queue.Count != 0)
+ {
+ if (_queue.Dequeue().TryGrant()) return;
+ }
+
+ _isHeld = false;
+ }
+
+ static void ThrowNotHeld() => throw new SemaphoreFullException();
+ }
+
+ private bool TryTakeInsideLock()
+ {
+ ThrowIfDisposed();
+ return TryTakeInsideLockCore();
+ }
+
+ private bool TryTakeInsideLockCore()
+ {
+ if (_isHeld || _queue.Count != 0) return false;
+ _isHeld = true;
+ return true;
+ }
+
+ private bool TryTakeSyncSlow()
+ {
+ if (TimeoutMilliseconds == 0) return false;
+
+ var start = GetTime();
+ SyncPendingCaller? pending = null;
+ bool lockTaken = false;
+ try
+ {
+ Monitor.TryEnter(_queue, TimeoutMilliseconds, ref lockTaken);
+ if (!lockTaken) return false;
+ if (TryTakeInsideLock()) return true;
+
+ var remaining = GetRemainingTimeout(start, TimeoutMilliseconds);
+ if (remaining == 0) return false;
+
+ pending = new SyncPendingCaller(start, TimeoutMilliseconds);
+ _queue.Enqueue(pending);
+ }
+ finally
+ {
+ if (lockTaken) Monitor.Exit(_queue);
+ }
+
+ return pending!.Wait();
+ }
+
+ public void Dispose()
+ {
+ if (_isDisposed) return;
+ _isDisposed = true;
+
+ lock (_queue)
+ {
+ _isHeld = false;
+ while (_queue.Count != 0)
+ {
+ _queue.Dequeue().Abort();
+ }
+ }
+ }
+
+ private void ThrowIfDisposed()
+ {
+ if (_isDisposed) ThrowDisposed();
+ }
+
+ private static void ThrowDisposed() => throw new ObjectDisposedException(nameof(AwaitableMutex));
+
+ private static ValueTask DisposedAsync()
+ => new(Task.FromException(new ObjectDisposedException(nameof(AwaitableMutex))));
+
+ private static ValueTask CanceledAsync(CancellationToken cancellationToken)
+ => new(Task.FromCanceled(cancellationToken));
+
+ private static uint GetTime() => (uint)Environment.TickCount;
+
+ private static int GetRemainingTimeout(uint startTime, int originalTimeoutMilliseconds)
+ {
+ if (originalTimeoutMilliseconds == Timeout.Infinite) return Timeout.Infinite;
+
+ var elapsedMilliseconds = GetTime() - startTime;
+ if (elapsedMilliseconds > int.MaxValue) return 0;
+
+ var remaining = originalTimeoutMilliseconds - (int)elapsedMilliseconds;
+ return remaining <= 0 ? 0 : remaining;
+ }
+
+ private interface IPendingCaller
+ {
+ bool TryGrant();
+ void Abort();
+ }
+
+ private sealed class SyncPendingCaller : IPendingCaller
+ {
+ private readonly uint _start;
+ private readonly int _timeoutMilliseconds;
+ private bool _isComplete;
+ private bool _wasGranted;
+ private bool _wasAborted;
+
+ public SyncPendingCaller(uint start, int timeoutMilliseconds)
+ {
+ _start = start;
+ _timeoutMilliseconds = timeoutMilliseconds;
+ }
+
+ public bool Wait()
+ {
+ lock (this)
+ {
+ while (!_isComplete)
+ {
+ var remaining = GetRemainingTimeout(_start, _timeoutMilliseconds);
+ if (remaining == 0)
+ {
+ _isComplete = true;
+ return false;
+ }
+
+ if (remaining == Timeout.Infinite)
+ {
+ Monitor.Wait(this);
+ }
+ else
+ {
+ Monitor.Wait(this, remaining);
+ }
+ }
+
+ if (_wasAborted) ThrowDisposed();
+ return _wasGranted;
+ }
+ }
+
+ public bool TryGrant()
+ {
+ lock (this)
+ {
+ if (_isComplete) return false;
+ _wasGranted = true;
+ _isComplete = true;
+ Monitor.Pulse(this);
+ return true;
+ }
+ }
+
+ public void Abort()
+ {
+ lock (this)
+ {
+ if (_isComplete) return;
+ _wasAborted = true;
+ _isComplete = true;
+ Monitor.Pulse(this);
+ }
+ }
+ }
+
+ private sealed class AsyncPendingCaller : TaskCompletionSource, IPendingCaller
+ {
+ private static readonly TimerCallback s_onTimeout = state => ((AsyncPendingCaller)state!).TryComplete(CompletionState.TimedOut);
+ private static readonly Action