diff --git a/README.md b/README.md index b72f1dde..7aa13fe9 100644 --- a/README.md +++ b/README.md @@ -313,6 +313,59 @@ Debug.Log("Connected to " + room.Name); +## Asynchronous programming: coroutines, async/await, and UniTask + +The SDK exposes three interchangeable styles for awaiting asynchronous operations. Coroutines, async/await and UniTask. + +**1. Coroutines (default, no dependency)** — shown throughout this README. + +**2. async/await (no dependency)** — every operation returns an awaitable instruction (`ConnectInstruction`, `PublishTrackInstruction`, `PerformRpcInstruction`, the stream read instructions, …), so you can `await` it directly. As with coroutines, you inspect success/failure on the instruction (`IsError`) — `await` does not throw. Continuations resume on Unity's main thread. + +```cs +async void Start() +{ + var room = new Room(); + var connect = room.Connect("ws://localhost:7880", "", new RoomOptions()); + await connect; + if (!connect.IsError) + Debug.Log("Connected to " + room.Name); +} +``` + +> Use `async void` only for top-level event handlers (e.g. button callbacks); its exceptions surface to Unity's log rather than to a caller. Prefer `async Task`/`async UniTaskVoid` elsewhere. + +**3. UniTask (optional)** — install [UniTask](https://github.com/Cysharp/UniTask) (`com.cysharp.unitask`). The SDK auto-detects it via the `LIVEKIT_UNITASK` scripting define and enables the `LiveKit.UniTask` assembly, which adds `CancellationToken` support, composition, and async streams. + +Cancellation (abandon-awaiter semantics — the underlying request is not cancelled on the wire): + +```cs +await room.Connect("ws://localhost:7880", "", new RoomOptions()) + .AsUniTask(cancellationToken); +``` + +Run operations in parallel: + +```cs +await UniTask.WhenAll( + room.LocalParticipant.PublishTrack(cameraTrack, cameraOptions).AsUniTask(ct), + room.LocalParticipant.PublishTrack(microphoneTrack, microphoneOptions).AsUniTask(ct)); +``` + +Consume an incremental stream with `await foreach`. The sequence ends at end-of-stream; if the stream ends with an error it throws a `StreamError`: + +```cs +try +{ + await foreach (var chunk in reader.ReadIncremental().AsAsyncEnumerable(ct)) + Process(chunk); +} +catch (StreamError e) +{ + Debug.LogError(e.Message); +} +``` + + ### Publishing microphone diff --git a/Runtime/Scripts/AssemblyInfo.cs b/Runtime/Scripts/AssemblyInfo.cs index e667b32e..9e802523 100644 --- a/Runtime/Scripts/AssemblyInfo.cs +++ b/Runtime/Scripts/AssemblyInfo.cs @@ -2,3 +2,4 @@ [assembly: InternalsVisibleTo("EditModeTests")] [assembly: InternalsVisibleTo("PlayModeTests")] +[assembly: InternalsVisibleTo("PlayModeTests.UniTask")] diff --git a/Runtime/Scripts/DataStream.cs b/Runtime/Scripts/DataStream.cs index 68b18b5a..131ccf56 100644 --- a/Runtime/Scripts/DataStream.cs +++ b/Runtime/Scripts/DataStream.cs @@ -95,7 +95,13 @@ public abstract class ReadIncrementalInstructionBase : StreamYieldInst /// public bool IsError => Error != null; - protected TContent LatestChunk + /// + /// The chunk from the most recent completed read. Throws the captured + /// if the last read errored. Public so the optional + /// UniTask async-enumerable adapter can read it generically; the typed + /// Bytes/Text accessors on the concrete readers delegate here. + /// + public TContent LatestChunk { get { diff --git a/Runtime/Scripts/Internal/YieldInstruction.cs b/Runtime/Scripts/Internal/YieldInstruction.cs index 748e884d..80a29010 100644 --- a/Runtime/Scripts/Internal/YieldInstruction.cs +++ b/Runtime/Scripts/Internal/YieldInstruction.cs @@ -1,4 +1,6 @@ using System; +using System.Runtime.CompilerServices; +using System.Threading; using UnityEngine; namespace LiveKit @@ -13,10 +15,79 @@ public class YieldInstruction : CustomYieldInstruction private volatile bool _isDone; private volatile bool _isError; - public bool IsDone { get => _isDone; protected set => _isDone = value; } + // Sentinel published once completion has fired so any continuation registered + // afterwards runs inline instead of being silently dropped. + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + + public bool IsDone + { + get => _isDone; + protected set + { + _isDone = value; + if (value) InvokeContinuation(); + } + } public bool IsError { get => _isError; protected set => _isError = value; } public override bool keepWaiting => !_isDone; + + /// + /// Returns an awaiter so callers can await this instruction directly. + /// + /// + /// The awaiter completes when becomes true. As with the + /// coroutine path, success vs. failure is inspected on the instruction itself + /// ( and any subclass-specific result fields); GetResult + /// does not throw. + /// + public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + // Race between completion-side (FFI thread writes sentinel) and await-side + // (registers continuation): CompareExchange decides who wrote first. + // null -> we won, completion will invoke our continuation later + // sentinel -> completion already fired; invoke inline + // other -> a second awaiter beat us here, which we don't support + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + continuation(); + return; + } + throw new InvalidOperationException( + "YieldInstruction does not support multiple awaiters; await it only once."); + } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + prev(); + } + } + } + + public readonly struct YieldInstructionAwaiter : INotifyCompletion + { + private readonly YieldInstruction _instruction; + + internal YieldInstructionAwaiter(YieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsDone; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + // Intentionally a no-op. Parity with the coroutine path: callers inspect IsError + // and subclass-specific result fields on the instruction itself. + public void GetResult() { } } public class StreamYieldInstruction : CustomYieldInstruction @@ -28,12 +99,37 @@ public class StreamYieldInstruction : CustomYieldInstruction private volatile bool _isEos; private volatile bool _isCurrentReadDone; + private static readonly Action s_completedSentinel = () => { }; + private Action? _continuation; + /// /// True if the stream has reached the end. /// - public bool IsEos { get => _isEos; protected set => _isEos = value; } + public bool IsEos + { + get => _isEos; + protected set + { + _isEos = value; + if (value) InvokeContinuation(); + } + } - internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; } + /// + /// True once a chunk is ready for the current read (before is + /// called for the next one). Public getter mirrors the sibling + /// DataTrack.ReadFrameInstruction.IsCurrentReadDone; the setter stays internal + /// because only the SDK's stream readers advance this state. + /// + public bool IsCurrentReadDone + { + get => _isCurrentReadDone; + internal set + { + _isCurrentReadDone = value; + if (value) InvokeContinuation(); + } + } public override bool keepWaiting => !_isCurrentReadDone && !_isEos; @@ -50,6 +146,54 @@ public override void Reset() throw new InvalidOperationException("Cannot reset after end of stream"); } _isCurrentReadDone = false; + // Drop the sentinel published by the previous completion so the next awaiter + // can install a fresh continuation. Safe because Reset is only called after the + // previous read's await has already resumed. + Volatile.Write(ref _continuation, null); } + + /// + /// Returns an awaiter that completes when the next chunk is ready or the stream ends. + /// Call between iterations to await the following chunk. + /// + public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this); + + internal void RegisterContinuation(Action continuation) + { + var prev = Interlocked.CompareExchange(ref _continuation, continuation, null); + if (prev == null) return; + if (ReferenceEquals(prev, s_completedSentinel)) + { + continuation(); + return; + } + throw new InvalidOperationException( + "StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk."); + } + + private void InvokeContinuation() + { + var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel); + if (prev != null && !ReferenceEquals(prev, s_completedSentinel)) + { + prev(); + } + } + } + + public readonly struct StreamYieldInstructionAwaiter : INotifyCompletion + { + private readonly StreamYieldInstruction _instruction; + + internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction) + { + _instruction = instruction; + } + + public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos; + + public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation); + + public void GetResult() { } } } diff --git a/Runtime/Scripts/UniTask.meta b/Runtime/Scripts/UniTask.meta new file mode 100644 index 00000000..45727607 --- /dev/null +++ b/Runtime/Scripts/UniTask.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 7f5f50e598f7646458d6958db6c7246a +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs new file mode 100644 index 00000000..f55a9165 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs @@ -0,0 +1,79 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; +using Cysharp.Threading.Tasks.Linq; + +namespace LiveKit +{ + /// + /// Exposes the SDK's incremental stream readers as + /// so chunks can be consumed with await foreach. Available only when the + /// com.cysharp.unitask package is installed (gated by LIVEKIT_UNITASK). + /// + public static class StreamReaderUniTaskExtensions + { + /// + /// Adapts an incremental stream read into an async sequence of chunks. Works for both + /// (byte[]) and + /// (string). + /// + /// + /// Iteration ends when the stream reaches end-of-stream. If the stream ends with an + /// error, the enumerable throws that (idiomatic for + /// await foreach; this is the one place the UniTask surface throws rather than + /// exposing IsError). Cancellation (via the token or the enumerator) surfaces as + /// with abandon-awaiter semantics — the + /// underlying FFI read is not cancelled on the wire. + /// + /// Like the coroutine consumer, this delivers the current chunk on the iteration where + /// end-of-stream is also observed, then stops. Chunks buffered beyond the + /// current one when end-of-stream arrives are not drainable — a pre-existing limitation + /// of the reader (its Reset() is disallowed past end-of-stream), not specific to + /// this adapter. + /// + public static IUniTaskAsyncEnumerable AsAsyncEnumerable( + this ReadIncrementalInstructionBase instruction, + CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + + return UniTaskAsyncEnumerable.Create(async (writer, token) => + { + // The enumerator hands us its own token; honor both it and the caller's. + using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, cancellationToken); + var ct = linked.Token; + + while (true) + { + // Completes when a chunk is ready (IsCurrentReadDone) or the stream ends (IsEos). + await instruction.AsUniTask(ct); + + if (instruction.IsCurrentReadDone) + { + var chunk = instruction.LatestChunk; + await writer.YieldAsync(chunk); + + // Re-check IsEos AFTER yielding: end-of-stream may have arrived while + // the consumer was suspended. Reset() throws once IsEos is set, so this + // re-check (not a value captured before the yield) is what keeps the + // loop safe — mirroring the coroutine consumer's "if (IsEos) break; + // else Reset()" ordering. + if (instruction.IsEos) + { + if (instruction.IsError) throw instruction.Error; + return; + } + + instruction.Reset(); + continue; + } + + // Not IsCurrentReadDone => end-of-stream with nothing left to read. + if (instruction.IsError) throw instruction.Error; + return; + } + }); + } + } +} +#endif diff --git a/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta new file mode 100644 index 00000000..883d5c63 --- /dev/null +++ b/Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ba02c0c61aa014db28635be5e1cf6e64 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs new file mode 100644 index 00000000..84747d49 --- /dev/null +++ b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs @@ -0,0 +1,93 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; + +namespace LiveKit +{ + /// + /// Bridges the SDK's / + /// surface to UniTask, adding support. Available only when + /// the com.cysharp.unitask package is installed; the assembly is otherwise excluded + /// via a defineConstraint on LIVEKIT_UNITASK. + /// + public static class YieldInstructionUniTaskExtensions + { + /// + /// Wraps the instruction as a . The task completes when the + /// instruction's transitions to true, or + /// faults with if the token fires + /// first. + /// + /// + /// Cancellation has "abandon awaiter" semantics: the underlying FFI request keeps + /// running and any result is discarded. Wire-level cancellation is not yet + /// supported. Error inspection stays on the instruction itself — the awaiter does + /// not throw on , matching the existing + /// yield return / await behavior. + /// + public static UniTask AsUniTask(this YieldInstruction instruction, CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + if (instruction.IsDone) return UniTask.CompletedTask; + if (cancellationToken.IsCancellationRequested) return UniTask.FromCanceled(cancellationToken); + + var source = new UniTaskCompletionSource(); + CancellationTokenRegistration registration = default; + + if (cancellationToken.CanBeCanceled) + { + registration = cancellationToken.Register(static state => + { + var s = (UniTaskCompletionSource)state; + s.TrySetCanceled(); + }, source); + } + + // YieldInstruction.RegisterContinuation fires the callback exactly once and is + // race-safe between FFI-thread completion and main-thread registration. Either + // TrySetResult or TrySetCanceled wins; the loser is a no-op. + instruction.GetAwaiter().OnCompleted(() => + { + registration.Dispose(); + source.TrySetResult(); + }); + + return source.Task; + } + + /// + /// UniTask-bridged equivalent of awaiting a once. + /// Call between chunks; each + /// AsUniTask call awaits the next chunk or end-of-stream. + /// + public static UniTask AsUniTask(this StreamYieldInstruction instruction, CancellationToken cancellationToken = default) + { + if (instruction == null) throw new System.ArgumentNullException(nameof(instruction)); + // GetAwaiter().IsCompleted folds together IsCurrentReadDone || IsEos and is + // the only public way to check the combined state from outside the LiveKit asm. + if (instruction.GetAwaiter().IsCompleted) return UniTask.CompletedTask; + if (cancellationToken.IsCancellationRequested) return UniTask.FromCanceled(cancellationToken); + + var source = new UniTaskCompletionSource(); + CancellationTokenRegistration registration = default; + + if (cancellationToken.CanBeCanceled) + { + registration = cancellationToken.Register(static state => + { + var s = (UniTaskCompletionSource)state; + s.TrySetCanceled(); + }, source); + } + + instruction.GetAwaiter().OnCompleted(() => + { + registration.Dispose(); + source.TrySetResult(); + }); + + return source.Task; + } + } +} +#endif diff --git a/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta new file mode 100644 index 00000000..42453b89 --- /dev/null +++ b/Runtime/Scripts/UniTask/YieldInstructionUniTaskExtensions.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 1264e1f4f8a8d4ad9ab94cdc2909a3a1 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef new file mode 100644 index 00000000..a1e3e218 --- /dev/null +++ b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef @@ -0,0 +1,26 @@ +{ + "name": "LiveKit.UniTask", + "rootNamespace": "LiveKit.UniTaskExtensions", + "references": [ + "LiveKit", + "UniTask", + "UniTask.Linq" + ], + "includePlatforms": [], + "excludePlatforms": [], + "allowUnsafeCode": false, + "overrideReferences": false, + "precompiledReferences": [], + "autoReferenced": true, + "defineConstraints": [ + "LIVEKIT_UNITASK" + ], + "versionDefines": [ + { + "name": "com.cysharp.unitask", + "expression": "2.0.0", + "define": "LIVEKIT_UNITASK" + } + ], + "noEngineReferences": false +} diff --git a/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta new file mode 100644 index 00000000..3a7e76a5 --- /dev/null +++ b/Runtime/Scripts/UniTask/livekit.unity.Runtime.UniTask.asmdef.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: a7fbb1537932e48f4a28030ab7a3ac51 +AssemblyDefinitionImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/RoomTests.cs b/Tests/PlayMode/RoomTests.cs index 1a871e5c..9cdc3031 100644 --- a/Tests/PlayMode/RoomTests.cs +++ b/Tests/PlayMode/RoomTests.cs @@ -1,5 +1,7 @@ using System.Collections; +using System.Threading.Tasks; using NUnit.Framework; +using UnityEngine; using UnityEngine.TestTools; using LiveKit.Proto; using LiveKit.PlayModeTests.Utils; @@ -26,6 +28,56 @@ public IEnumerator Connect_FailsWithInvalidUrl() Assert.IsNotNull(context.ConnectionError, "Expected connection to fail"); } + // Deterministic coverage of the GetAwaiter surface added in Stage 1, using a + // synthetic instruction so the awaiter logic is exercised without the FFI. These + // are intentionally NOT [Category("E2E")] — they need no dev server. The real + // connect-fail path stays covered by Connect_FailsWithInvalidUrl above; an earlier + // E2E variant of these was flaky because the FFI emits its error log asynchronously, + // which races LogAssert in the frame after the await has already resumed. + private sealed class TestYieldInstruction : YieldInstruction + { + public void Complete() => IsDone = true; + public void CompleteWithError() { IsError = true; IsDone = true; } + } + + // OnCompleted path: await registers a continuation while the instruction is still + // pending, then completion fires it and IsError is visible on resume. + [UnityTest] + public IEnumerator GetAwaiter_ResumesOnCompletion_AndSurfacesIsError() + { + var instruction = new TestYieldInstruction(); + var awaitTask = AwaitInstruction(instruction); + Assert.IsFalse(awaitTask.IsCompleted, "Awaiter must not resume before IsDone"); + + instruction.CompleteWithError(); + yield return new WaitUntil(() => awaitTask.IsCompleted); + + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(instruction.IsDone, "Awaiter resumed, so IsDone must be observable"); + Assert.IsTrue(instruction.IsError, "IsError must be visible on resume"); + } + + // IsCompleted fast path: instruction is already done before it is awaited, so the + // awaiter completes without ever registering a continuation. + [UnityTest] + public IEnumerator GetAwaiter_CompletesImmediately_WhenAlreadyDone() + { + var instruction = new TestYieldInstruction(); + instruction.Complete(); + + var awaitTask = AwaitInstruction(instruction); + yield return new WaitUntil(() => awaitTask.IsCompleted); + + Assert.IsNull(awaitTask.Exception, awaitTask.Exception?.ToString()); + Assert.IsTrue(instruction.IsDone); + Assert.IsFalse(instruction.IsError); + } + + private static async Task AwaitInstruction(YieldInstruction instruction) + { + await instruction; + } + [UnityTest, Category("E2E")] public IEnumerator RoomName_MatchesProvided() { diff --git a/Tests/PlayMode/UniTask.meta b/Tests/PlayMode/UniTask.meta new file mode 100644 index 00000000..bc43a857 --- /dev/null +++ b/Tests/PlayMode/UniTask.meta @@ -0,0 +1,8 @@ +fileFormatVersion: 2 +guid: 1ef4cd187b61c4a388e674497c3ac63d +folderAsset: yes +DefaultImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef new file mode 100644 index 00000000..db1f62ad --- /dev/null +++ b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef @@ -0,0 +1,32 @@ +{ + "name": "PlayModeTests.UniTask", + "rootNamespace": "LiveKit.PlayModeTests.UniTask", + "references": [ + "UnityEngine.TestRunner", + "UnityEditor.TestRunner", + "LiveKit", + "LiveKit.UniTask", + "UniTask", + "UniTask.Linq" + ], + "includePlatforms": [], + "excludePlatforms": [], + "allowUnsafeCode": false, + "overrideReferences": true, + "precompiledReferences": [ + "nunit.framework.dll" + ], + "autoReferenced": false, + "defineConstraints": [ + "UNITY_INCLUDE_TESTS", + "LIVEKIT_UNITASK" + ], + "versionDefines": [ + { + "name": "com.cysharp.unitask", + "expression": "2.0.0", + "define": "LIVEKIT_UNITASK" + } + ], + "noEngineReferences": false +} diff --git a/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta new file mode 100644 index 00000000..ba465dbf --- /dev/null +++ b/Tests/PlayMode/UniTask/LiveKit.PlayModeTests.UniTask.asmdef.meta @@ -0,0 +1,7 @@ +fileFormatVersion: 2 +guid: 693ef0d1937d94c97aa2969770e3b59c +AssemblyDefinitionImporter: + externalObjects: {} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/RoomUniTaskTests.cs b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs new file mode 100644 index 00000000..69dce085 --- /dev/null +++ b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs @@ -0,0 +1,71 @@ +#if LIVEKIT_UNITASK +using System.Threading; +using Cysharp.Threading.Tasks; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace LiveKit.PlayModeTests.UniTaskBridge +{ + public class RoomUniTaskTests + { + // Synthetic instruction used by the unit tests below — they verify the + // AsUniTask extension's behavior directly against the public setter contract + // (IsError then IsDone, mirroring the production completion order in + // Room.cs / Participant.cs / Track.cs) without needing the FFI. + private sealed class TestInstruction : YieldInstruction + { + public void Complete() => IsDone = true; + public void CompleteWithError() { IsError = true; IsDone = true; } + } + + // AsUniTask must complete when IsDone transitions to true, with the + // instruction's IsError visible on resume — parity with the await path + // covered by the Stage 1 Connect_FailsWithInvalidUrl_Awaitable test. + [UnityTest] + public System.Collections.IEnumerator AsUniTask_CompletesOnIsDone() => UniTask.ToCoroutine(async () => + { + var instruction = new TestInstruction(); + var task = instruction.AsUniTask(); + Assert.IsFalse(instruction.IsDone, "Sanity: instruction must not be done before Complete()"); + + instruction.CompleteWithError(); + await task; + + Assert.IsTrue(instruction.IsDone, "UniTask should not resume before IsDone"); + Assert.IsTrue(instruction.IsError, "Error state must be visible on resume"); + }); + + // Cancellation has abandon-awaiter semantics: the UniTask faults with + // OperationCanceledException, but the underlying request is not aborted. + // The synthetic instruction is never completed — only the token fires. + [UnityTest] + public System.Collections.IEnumerator AsUniTask_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () => + { + var instruction = new TestInstruction(); + using var cts = new CancellationTokenSource(); + + var task = instruction.AsUniTask(cts.Token); + cts.Cancel(); + + bool threw = false; + try + { + await task; + } + catch (System.OperationCanceledException) + { + threw = true; + } + + Assert.IsTrue(threw, "Expected OperationCanceledException when token was cancelled"); + Assert.IsFalse(instruction.IsDone, "Abandon-awaiter semantics: underlying instruction is untouched"); + }); + + // End-to-end coverage of the FFI path is handled by the migrated Meet sample + // (Samples~/Meet/Assets/Runtime/MeetManager.cs). An additional E2E test here + // was tried and removed: FFI error logs arrive asynchronously and their delivery + // window races UniTask's synchronous resume, so the LogAssert tracking was + // brittle across test order. The unit tests above cover the extension's logic. + } +} +#endif diff --git a/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta new file mode 100644 index 00000000..588a85ab --- /dev/null +++ b/Tests/PlayMode/UniTask/RoomUniTaskTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 8fbde2a93f51d461ba697e4688c30e13 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs new file mode 100644 index 00000000..bb07f4b6 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs @@ -0,0 +1,142 @@ +#if LIVEKIT_UNITASK +using System; +using System.Collections.Generic; +using System.Threading; +using Cysharp.Threading.Tasks; +using LiveKit.Internal; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace LiveKit.PlayModeTests.UniTaskBridge +{ + public class StreamUniTaskTests + { + // Synthetic incremental reader that drives the base chunk/EoS machinery directly, + // with no FFI — the same seam used by the EditMode DataStreamIncrementalReadTests. + // FfiHandle is public; new FfiHandle(IntPtr.Zero) is a valid dummy handle. + private sealed class TestIncrementalReader : ReadIncrementalInstructionBase + { + public TestIncrementalReader(FfiHandle h) : base(h) { } + public void PushChunk(string content) => OnChunk(content); + public void PushEos(LiveKit.Proto.StreamError error = null) => OnEos(error); + } + + // Chunks pushed and consumed one at a time arrive in order; the sequence ends when + // EoS is observed. Manual enumeration interleaves push/pull so EoS only follows a + // fully drained queue (matching how chunks arrive over time in production). + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversChunksInOrder_ThenStops() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("A"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk A"); + Assert.AreEqual("A", e.Current); + + reader.PushChunk("B"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk B"); + Assert.AreEqual("B", e.Current); + + reader.PushChunk("C"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected chunk C"); + Assert.AreEqual("C", e.Current); + + reader.PushEos(); + Assert.IsFalse(await e.MoveNextAsync(), "Enumeration must end at EoS"); + } + finally + { + await e.DisposeAsync(); + } + }); + + // The current chunk is delivered even when EoS is already set at the time it is read, + // then the sequence ends. (Chunks buffered beyond the current one when EoS arrives are + // not drainable — a pre-existing reader limitation, asserted here for clarity.) + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_DeliversFinalChunkThenEos() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + reader.PushChunk("only"); + reader.PushEos(); + + var observed = new List(); + await foreach (var chunk in reader.AsAsyncEnumerable()) + observed.Add(chunk); + + CollectionAssert.AreEqual(new[] { "only" }, observed); + }); + + // A chunk delivered before the stream errors is observed; the subsequent error EoS + // then surfaces as a thrown StreamError. Manual enumeration models the real timeline + // (chunk arrives, is consumed, THEN the error ends the stream) — note that once the + // error is set, LatestChunk itself throws, so the error must follow chunk delivery. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_ThrowsStreamError_AfterDeliveringChunk() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + + var e = reader.AsAsyncEnumerable().GetAsyncEnumerator(); + try + { + reader.PushChunk("partial"); + Assert.IsTrue(await e.MoveNextAsync(), "Expected the pre-error chunk"); + Assert.AreEqual("partial", e.Current); + + reader.PushEos(new LiveKit.Proto.StreamError { Description = "boom" }); + + StreamError caught = null; + try + { + await e.MoveNextAsync(); + } + catch (StreamError ex) + { + caught = ex; + } + + Assert.IsNotNull(caught, "Expected the error EoS to throw a StreamError"); + Assert.AreEqual("boom", caught.Message); + } + finally + { + await e.DisposeAsync(); + } + }); + + // A cancelled token surfaces as OperationCanceledException with abandon-awaiter + // semantics: nothing is observed and the underlying reader is untouched. + [UnityTest] + public System.Collections.IEnumerator AsAsyncEnumerable_Cancellation_ThrowsOperationCanceled() => UniTask.ToCoroutine(async () => + { + using var handle = new FfiHandle(IntPtr.Zero); + var reader = new TestIncrementalReader(handle); + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + var observed = new List(); + bool threw = false; + try + { + await foreach (var chunk in reader.AsAsyncEnumerable(cts.Token)) + observed.Add(chunk); + } + catch (OperationCanceledException) + { + threw = true; + } + + Assert.IsTrue(threw, "Expected OperationCanceledException for a cancelled token"); + CollectionAssert.IsEmpty(observed); + Assert.IsFalse(reader.IsEos, "Abandon-awaiter semantics: reader state is untouched"); + }); + } +} +#endif diff --git a/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta new file mode 100644 index 00000000..a10435c8 --- /dev/null +++ b/Tests/PlayMode/UniTask/StreamUniTaskTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: a2e6312b068f8432fa2b267f28d3e10b +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: