Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,59 @@ Debug.Log("Connected to " + room.Name);



## Asynchronous programming: coroutines, async/await, and UniTask

The SDK exposes three interchangeable styles for awaiting asynchronous operations. Coroutines, async/await and UniTask.

**1. Coroutines (default, no dependency)** — shown throughout this README.

**2. async/await (no dependency)** — every operation returns an awaitable instruction (`ConnectInstruction`, `PublishTrackInstruction`, `PerformRpcInstruction`, the stream read instructions, …), so you can `await` it directly. As with coroutines, you inspect success/failure on the instruction (`IsError`) — `await` does not throw. Continuations resume on Unity's main thread.

```cs
async void Start()
{
var room = new Room();
var connect = room.Connect("ws://localhost:7880", "<join-token>", new RoomOptions());
await connect;
if (!connect.IsError)
Debug.Log("Connected to " + room.Name);
}
```

> Use `async void` only for top-level event handlers (e.g. button callbacks); its exceptions surface to Unity's log rather than to a caller. Prefer `async Task`/`async UniTaskVoid` elsewhere.

**3. UniTask (optional)** — install [UniTask](https://github.com/Cysharp/UniTask) (`com.cysharp.unitask`). The SDK auto-detects it via the `LIVEKIT_UNITASK` scripting define and enables the `LiveKit.UniTask` assembly, which adds `CancellationToken` support, composition, and async streams.

Cancellation (abandon-awaiter semantics — the underlying request is not cancelled on the wire):

```cs
await room.Connect("ws://localhost:7880", "<join-token>", new RoomOptions())
.AsUniTask(cancellationToken);
```

Run operations in parallel:

```cs
await UniTask.WhenAll(
room.LocalParticipant.PublishTrack(cameraTrack, cameraOptions).AsUniTask(ct),
room.LocalParticipant.PublishTrack(microphoneTrack, microphoneOptions).AsUniTask(ct));
```

Consume an incremental stream with `await foreach`. The sequence ends at end-of-stream; if the stream ends with an error it throws a `StreamError`:

```cs
try
{
await foreach (var chunk in reader.ReadIncremental().AsAsyncEnumerable(ct))
Process(chunk);
}
catch (StreamError e)
{
Debug.LogError(e.Message);
}
```


### Publishing microphone


Expand Down
1 change: 1 addition & 0 deletions Runtime/Scripts/AssemblyInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

[assembly: InternalsVisibleTo("EditModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests")]
[assembly: InternalsVisibleTo("PlayModeTests.UniTask")]
8 changes: 7 additions & 1 deletion Runtime/Scripts/DataStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public abstract class ReadIncrementalInstructionBase<TContent> : StreamYieldInst
/// </summary>
public bool IsError => Error != null;

protected TContent LatestChunk
/// <summary>
/// The chunk from the most recent completed read. Throws the captured
/// <see cref="StreamError"/> if the last read errored. Public so the optional
/// UniTask async-enumerable adapter can read it generically; the typed
/// <c>Bytes</c>/<c>Text</c> accessors on the concrete readers delegate here.
/// </summary>
public TContent LatestChunk
{
get
{
Expand Down
150 changes: 147 additions & 3 deletions Runtime/Scripts/Internal/YieldInstruction.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;
using UnityEngine;

namespace LiveKit
Expand All @@ -13,10 +15,79 @@ public class YieldInstruction : CustomYieldInstruction
private volatile bool _isDone;
private volatile bool _isError;

public bool IsDone { get => _isDone; protected set => _isDone = value; }
// Sentinel published once completion has fired so any continuation registered
// afterwards runs inline instead of being silently dropped.
private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

public bool IsDone
{
get => _isDone;
protected set
{
_isDone = value;
if (value) InvokeContinuation();
}
}
public bool IsError { get => _isError; protected set => _isError = value; }

public override bool keepWaiting => !_isDone;

/// <summary>
/// Returns an awaiter so callers can <c>await</c> this instruction directly.
/// </summary>
/// <remarks>
/// The awaiter completes when <see cref="IsDone"/> becomes true. As with the
/// coroutine path, success vs. failure is inspected on the instruction itself
/// (<see cref="IsError"/> and any subclass-specific result fields); <c>GetResult</c>
/// does not throw.
/// </remarks>
public YieldInstructionAwaiter GetAwaiter() => new YieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
// Race between completion-side (FFI thread writes sentinel) and await-side
// (registers continuation): CompareExchange decides who wrote first.
// null -> we won, completion will invoke our continuation later
// sentinel -> completion already fired; invoke inline
// other -> a second awaiter beat us here, which we don't support
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
continuation();
return;
}
throw new InvalidOperationException(
"YieldInstruction does not support multiple awaiters; await it only once.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
prev();
}
}
}

public readonly struct YieldInstructionAwaiter : INotifyCompletion
{
private readonly YieldInstruction _instruction;

internal YieldInstructionAwaiter(YieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsDone;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

// Intentionally a no-op. Parity with the coroutine path: callers inspect IsError
// and subclass-specific result fields on the instruction itself.
public void GetResult() { }
}

public class StreamYieldInstruction : CustomYieldInstruction
Expand All @@ -28,12 +99,37 @@ public class StreamYieldInstruction : CustomYieldInstruction
private volatile bool _isEos;
private volatile bool _isCurrentReadDone;

private static readonly Action s_completedSentinel = () => { };
private Action? _continuation;

/// <summary>
/// True if the stream has reached the end.
/// </summary>
public bool IsEos { get => _isEos; protected set => _isEos = value; }
public bool IsEos
{
get => _isEos;
protected set
{
_isEos = value;
if (value) InvokeContinuation();
}
}

internal bool IsCurrentReadDone { get => _isCurrentReadDone; set => _isCurrentReadDone = value; }
/// <summary>
/// True once a chunk is ready for the current read (before <see cref="Reset"/> is
/// called for the next one). Public getter mirrors the sibling
/// <c>DataTrack.ReadFrameInstruction.IsCurrentReadDone</c>; the setter stays internal
/// because only the SDK's stream readers advance this state.
/// </summary>
public bool IsCurrentReadDone
{
get => _isCurrentReadDone;
internal set
{
_isCurrentReadDone = value;
if (value) InvokeContinuation();
}
}

public override bool keepWaiting => !_isCurrentReadDone && !_isEos;

Expand All @@ -50,6 +146,54 @@ public override void Reset()
throw new InvalidOperationException("Cannot reset after end of stream");
}
_isCurrentReadDone = false;
// Drop the sentinel published by the previous completion so the next awaiter
// can install a fresh continuation. Safe because Reset is only called after the
// previous read's await has already resumed.
Volatile.Write(ref _continuation, null);
}

/// <summary>
/// Returns an awaiter that completes when the next chunk is ready or the stream ends.
/// Call <see cref="Reset"/> between iterations to await the following chunk.
/// </summary>
public StreamYieldInstructionAwaiter GetAwaiter() => new StreamYieldInstructionAwaiter(this);

internal void RegisterContinuation(Action continuation)
{
var prev = Interlocked.CompareExchange(ref _continuation, continuation, null);
if (prev == null) return;
if (ReferenceEquals(prev, s_completedSentinel))
{
continuation();
return;
}
throw new InvalidOperationException(
"StreamYieldInstruction does not support multiple concurrent awaiters; await it once per chunk.");
}

private void InvokeContinuation()
{
var prev = Interlocked.Exchange(ref _continuation, s_completedSentinel);
if (prev != null && !ReferenceEquals(prev, s_completedSentinel))
{
prev();
}
}
}

public readonly struct StreamYieldInstructionAwaiter : INotifyCompletion
{
private readonly StreamYieldInstruction _instruction;

internal StreamYieldInstructionAwaiter(StreamYieldInstruction instruction)
{
_instruction = instruction;
}

public bool IsCompleted => _instruction.IsCurrentReadDone || _instruction.IsEos;

public void OnCompleted(Action continuation) => _instruction.RegisterContinuation(continuation);

public void GetResult() { }
}
}
8 changes: 8 additions & 0 deletions Runtime/Scripts/UniTask.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#if LIVEKIT_UNITASK
using System.Threading;
using Cysharp.Threading.Tasks;
using Cysharp.Threading.Tasks.Linq;

namespace LiveKit
{
/// <summary>
/// Exposes the SDK's incremental stream readers as <see cref="IUniTaskAsyncEnumerable{T}"/>
/// so chunks can be consumed with <c>await foreach</c>. Available only when the
/// <c>com.cysharp.unitask</c> package is installed (gated by <c>LIVEKIT_UNITASK</c>).
/// </summary>
public static class StreamReaderUniTaskExtensions
{
/// <summary>
/// Adapts an incremental stream read into an async sequence of chunks. Works for both
/// <see cref="ByteStreamReader.ReadIncrementalInstruction"/> (<c>byte[]</c>) and
/// <see cref="TextStreamReader.ReadIncrementalInstruction"/> (<c>string</c>).
/// </summary>
/// <remarks>
/// Iteration ends when the stream reaches end-of-stream. If the stream ends with an
/// error, the enumerable throws that <see cref="StreamError"/> (idiomatic for
/// <c>await foreach</c>; this is the one place the UniTask surface throws rather than
/// exposing <c>IsError</c>). Cancellation (via the token or the enumerator) surfaces as
/// <see cref="System.OperationCanceledException"/> with abandon-awaiter semantics — the
/// underlying FFI read is not cancelled on the wire.
///
/// Like the coroutine consumer, this delivers the current chunk on the iteration where
/// end-of-stream is also observed, then stops. Chunks buffered <em>beyond</em> the
/// current one when end-of-stream arrives are not drainable — a pre-existing limitation
/// of the reader (its <c>Reset()</c> is disallowed past end-of-stream), not specific to
/// this adapter.
/// </remarks>
public static IUniTaskAsyncEnumerable<TChunk> AsAsyncEnumerable<TChunk>(
this ReadIncrementalInstructionBase<TChunk> instruction,
CancellationToken cancellationToken = default)
{
if (instruction == null) throw new System.ArgumentNullException(nameof(instruction));

return UniTaskAsyncEnumerable.Create<TChunk>(async (writer, token) =>
{
// The enumerator hands us its own token; honor both it and the caller's.
using var linked = CancellationTokenSource.CreateLinkedTokenSource(token, cancellationToken);
var ct = linked.Token;

while (true)
{
// Completes when a chunk is ready (IsCurrentReadDone) or the stream ends (IsEos).
await instruction.AsUniTask(ct);

if (instruction.IsCurrentReadDone)
{
var chunk = instruction.LatestChunk;
await writer.YieldAsync(chunk);

// Re-check IsEos AFTER yielding: end-of-stream may have arrived while
// the consumer was suspended. Reset() throws once IsEos is set, so this
// re-check (not a value captured before the yield) is what keeps the
// loop safe — mirroring the coroutine consumer's "if (IsEos) break;
// else Reset()" ordering.
if (instruction.IsEos)
{
if (instruction.IsError) throw instruction.Error;
return;
}

instruction.Reset();
continue;
}

// Not IsCurrentReadDone => end-of-stream with nothing left to read.
if (instruction.IsError) throw instruction.Error;
return;
}
});
}
}
}
#endif
11 changes: 11 additions & 0 deletions Runtime/Scripts/UniTask/StreamReaderUniTaskExtensions.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading