diff --git a/.changeset/new-tomatoes-grow.md b/.changeset/new-tomatoes-grow.md new file mode 100644 index 000000000..76dcf00f5 --- /dev/null +++ b/.changeset/new-tomatoes-grow.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Clarified documentation regarding data stream receivers and errors diff --git a/.changeset/tender-dolls-reflect.md b/.changeset/tender-dolls-reflect.md new file mode 100644 index 000000000..2d14ac4bf --- /dev/null +++ b/.changeset/tender-dolls-reflect.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix `BaseStreamReceiver.readAll` leaking thrown exceptions diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt index 9c5ccb57d..1e51135bc 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ sealed class StreamException(message: String? = null) : Exception(message) { class FileInfoUnavailableException : StreamException() /** - * + * Encryption of the data chunks did not match the declared encryption type. */ class EncryptionTypeMismatch(message: String? = null) : StreamException(message) } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt index d8a7a6ad1..2c72aaecb 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,42 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.datastream.StreamException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.fold +/** + * Base class for reading incoming data streams. + * + * @see [flow] + * @see [readNext] + * @see [readAll] + */ abstract class BaseStreamReceiver(private val source: Channel) { + /** + * A [Flow] of stream data as it arrives. + * + * Collect this flow to process incoming data incrementally. The flow completes normally when + * the stream receives all the data without error. If the stream ends abnormally, the + * flow fails with a [StreamException] after all buffered chunks have been emitted. + * + * Example: + * ``` + * reader.flow + * .catch { e -> + * if (e is StreamException) { + * handleStreamError(e) + * } else { + * throw e + * } + * } + * .collect { chunk -> process(chunk) } + * ``` + */ abstract val flow: Flow internal fun close(error: Exception?) { @@ -34,7 +62,8 @@ abstract class BaseStreamReceiver(private val source: Channel) { * Suspends and waits for the next piece of data. * * @return the next available piece of data. - * @throws NoSuchElementException when the stream is closed and no more data is available. + * @throws NoSuchElementException when the stream is closed normally and no more data is available. + * @throws StreamException when the stream is closed abnormally. */ suspend fun readNext(): T { return flow.first() @@ -42,10 +71,14 @@ abstract class BaseStreamReceiver(private val source: Channel) { /** * Suspends and waits for all available data until the stream is closed. + * + * Exceptions are swallowed; this returns all data received before the stream closed, + * whether normally or abnormally. + * + * @return A list of all data received before the stream closed. */ suspend fun readAll(): List { - flow.catch { } - return flow.fold(mutableListOf()) { acc, value -> + return flow.catch { }.fold(mutableListOf()) { acc, value -> acc.add(value) return@fold acc } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt index c2d4f2ac9..857b6391a 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,21 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.Room import io.livekit.android.room.datastream.ByteStreamInfo import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow +/** + * Receiver for an incoming byte stream. + * + * Provided to [ByteStreamHandler] callbacks registered through [Room]. + * + * @see Room.registerByteStreamHandler + */ class ByteStreamReceiver( + /** Metadata for this stream. */ val info: ByteStreamInfo, channel: Channel, ) : BaseStreamReceiver(channel) { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt index 6cb629e51..de1d51f76 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,23 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.Room import io.livekit.android.room.datastream.TextStreamInfo import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.receiveAsFlow +/** + * Receiver for an incoming text stream. + * + * Chunks are decoded as UTF-8 strings. Provided to [TextStreamHandler] callbacks registered + * through [Room]. + * + * @see Room.registerTextStreamHandler + */ class TextStreamReceiver( + /** Metadata for this stream. */ val info: TextStreamInfo, source: Channel, ) : BaseStreamReceiver(source) { diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt index 6b2fcff26..30fb984fa 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -88,6 +88,36 @@ class StreamReaderTest : BaseTest() { } } + @Test + fun readAllSwallowsStreamException() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + errorChannel.trySend(ByteArray(1) { 0 }) + errorChannel.trySend(ByteArray(1) { 1 }) + errorChannel.trySend(ByteArray(1) { 2 }) + errorChannel.close(StreamException.AbnormalEndException("reason")) + val errorReader = ByteStreamReceiver( + ByteStreamInfo( + id = "id", + topic = "topic", + timestampMs = 3, + totalSize = null, + attributes = mapOf(), + mimeType = "mime", + name = null, + encryptionType = LivekitModels.Encryption.Type.NONE, + ), + errorChannel, + ) + + runBlocking { + val data = errorReader.readAll() + assertEquals(3, data.size) + for (i in 0..2) { + assertEquals(i, data[i][0].toInt()) + } + } + } + @Test fun overreadThrows() = runTest { var threwOnce = false