From 85e5bf5b714531f6abe0d485b5f4deb9f23b8ceb Mon Sep 17 00:00:00 2001 From: davidliu Date: Fri, 12 Jun 2026 01:42:18 +0900 Subject: [PATCH 1/5] Update docs for stream receivers and errors --- .changeset/new-tomatoes-grow.md | 5 +++ .../room/datastream/StreamException.kt | 4 +-- .../datastream/incoming/BaseStreamReceiver.kt | 36 +++++++++++++++++-- .../datastream/incoming/ByteStreamReceiver.kt | 11 +++++- .../datastream/incoming/TextStreamReceiver.kt | 12 ++++++- 5 files changed, 62 insertions(+), 6 deletions(-) create mode 100644 .changeset/new-tomatoes-grow.md diff --git a/.changeset/new-tomatoes-grow.md b/.changeset/new-tomatoes-grow.md new file mode 100644 index 000000000..76dcf00f5 --- /dev/null +++ b/.changeset/new-tomatoes-grow.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Clarified documentation regarding data stream receivers and errors diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt index 9c5ccb57d..1e51135bc 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/StreamException.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,7 +63,7 @@ sealed class StreamException(message: String? = null) : Exception(message) { class FileInfoUnavailableException : StreamException() /** - * + * Encryption of the data chunks did not match the declared encryption type. */ class EncryptionTypeMismatch(message: String? = null) : StreamException(message) } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt index d8a7a6ad1..fa9527283 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,42 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.datastream.StreamException import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.fold +/** + * Base class for reading incoming data streams. + * + * @see [flow] + * @see [readNext] + * @see [readAll] + */ abstract class BaseStreamReceiver(private val source: Channel) { + /** + * A [Flow] of stream data as it arrives. + * + * Collect this flow to process incoming data incrementally. The flow completes normally when + * the stream receives all the data without error. If the stream ends abnormally, the + * flow fails with a [StreamException] after all buffered chunks have been emitted. + * + * Example: + * ``` + * reader.flow + * .catch { e -> + * if (e is StreamException) { + * handleStreamError(e) + * } else { + * throw e + * } + * } + * .collect { chunk -> process(chunk) } + * ``` + */ abstract val flow: Flow internal fun close(error: Exception?) { @@ -34,7 +62,8 @@ abstract class BaseStreamReceiver(private val source: Channel) { * Suspends and waits for the next piece of data. * * @return the next available piece of data. - * @throws NoSuchElementException when the stream is closed and no more data is available. + * @throws NoSuchElementException when the stream is closed normally and no more data is available. + * @throws StreamException when the stream is closed abnormally. */ suspend fun readNext(): T { return flow.first() @@ -42,6 +71,9 @@ abstract class BaseStreamReceiver(private val source: Channel) { /** * Suspends and waits for all available data until the stream is closed. + * + * [StreamException]s are swallowed; this returns all data received before the stream closed, + * whether normally or abnormally. */ suspend fun readAll(): List { flow.catch { } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt index c2d4f2ac9..857b6391a 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/ByteStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,21 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.Room import io.livekit.android.room.datastream.ByteStreamInfo import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow +/** + * Receiver for an incoming byte stream. + * + * Provided to [ByteStreamHandler] callbacks registered through [Room]. + * + * @see Room.registerByteStreamHandler + */ class ByteStreamReceiver( + /** Metadata for this stream. */ val info: ByteStreamInfo, channel: Channel, ) : BaseStreamReceiver(channel) { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt index 6cb629e51..de1d51f76 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/TextStreamReceiver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,23 @@ package io.livekit.android.room.datastream.incoming +import io.livekit.android.room.Room import io.livekit.android.room.datastream.TextStreamInfo import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.receiveAsFlow +/** + * Receiver for an incoming text stream. + * + * Chunks are decoded as UTF-8 strings. Provided to [TextStreamHandler] callbacks registered + * through [Room]. + * + * @see Room.registerTextStreamHandler + */ class TextStreamReceiver( + /** Metadata for this stream. */ val info: TextStreamInfo, source: Channel, ) : BaseStreamReceiver(source) { From 9ce4625351dec638e8e199fe1aa2a7f93ff0f5a1 Mon Sep 17 00:00:00 2001 From: davidliu Date: Fri, 12 Jun 2026 17:58:44 +0900 Subject: [PATCH 2/5] Fix BaseStreamReceiver.readAll leaking thrown exceptions Also add isClosed and closeException --- .changeset/early-beers-enjoy.md | 5 + .changeset/selfish-drinks-play.md | 5 + .../datastream/incoming/BaseStreamReceiver.kt | 34 +++- .../room/datastream/StreamReaderTest.kt | 178 +++++++++++++++++- 4 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 .changeset/early-beers-enjoy.md create mode 100644 .changeset/selfish-drinks-play.md diff --git a/.changeset/early-beers-enjoy.md b/.changeset/early-beers-enjoy.md new file mode 100644 index 000000000..f6ddd1df8 --- /dev/null +++ b/.changeset/early-beers-enjoy.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Add `closeException` and `isClosed` helpers to `BaseStreamReceiver` diff --git a/.changeset/selfish-drinks-play.md b/.changeset/selfish-drinks-play.md new file mode 100644 index 000000000..2d14ac4bf --- /dev/null +++ b/.changeset/selfish-drinks-play.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix `BaseStreamReceiver.readAll` leaking thrown exceptions diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt index fa9527283..c7dbb71e3 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt @@ -17,6 +17,7 @@ package io.livekit.android.room.datastream.incoming import io.livekit.android.room.datastream.StreamException +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch @@ -32,6 +33,34 @@ import kotlinx.coroutines.flow.fold */ abstract class BaseStreamReceiver(private val source: Channel) { + private var closeCause: Throwable? = null + + init { + @OptIn(ExperimentalCoroutinesApi::class) + source.invokeOnClose { cause -> + closeCause = cause + } + } + + /** + * The [StreamException] this stream was closed with, or `null` if it closed normally or is still open. + * + * Note: Buffered data may still be available even if this is set. + * + * @see [isClosed] + */ + val closeException: StreamException? + get() = closeCause as? StreamException + + /** + * True if the stream is closed and all buffered data has been drained. + * + * @see [closeException] + */ + @OptIn(ExperimentalCoroutinesApi::class) + val isClosed: Boolean + get() = source.isClosedForReceive + /** * A [Flow] of stream data as it arrives. * @@ -73,11 +102,10 @@ abstract class BaseStreamReceiver(private val source: Channel) { * Suspends and waits for all available data until the stream is closed. * * [StreamException]s are swallowed; this returns all data received before the stream closed, - * whether normally or abnormally. + * whether normally or abnormally. Use [closeException] to check the cause of the stream closure. */ suspend fun readAll(): List { - flow.catch { } - return flow.fold(mutableListOf()) { acc, value -> + return flow.catch { }.fold(mutableListOf()) { acc, value -> acc.add(value) return@fold acc } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt index 6b2fcff26..5781fab80 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import livekit.LivekitModels import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -56,6 +58,7 @@ class StreamReaderTest : BaseTest() { @Test fun buffersDataUntilSubscribed() = runTest { + assertFalse(reader.isClosed) var count = 0 runBlocking { reader.flow.collect { @@ -65,6 +68,7 @@ class StreamReaderTest : BaseTest() { } assertEquals(3, count) + assertTrue(reader.isClosed) } @Test @@ -79,6 +83,8 @@ class StreamReaderTest : BaseTest() { @Test fun readAll() = runTest { + assertNull(reader.closeException) + assertFalse(reader.isClosed) runBlocking { val data = reader.readAll() assertEquals(3, data.size) @@ -86,6 +92,160 @@ class StreamReaderTest : BaseTest() { assertEquals(i, data[i][0].toInt()) } } + assertNull(reader.closeException) + assertTrue(reader.isClosed) + } + + @Test + fun readAllSwallowsStreamException() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val errorReader = createReader(errorChannel) + errorChannel.trySend(ByteArray(1) { 0 }) + errorChannel.trySend(ByteArray(1) { 1 }) + errorChannel.trySend(ByteArray(1) { 2 }) + val error = StreamException.AbnormalEndException("reason") + errorChannel.close(error) + + runBlocking { + assertEquals(error, errorReader.closeException) + val data = errorReader.readAll() + assertEquals(3, data.size) + for (i in 0..2) { + assertEquals(i, data[i][0].toInt()) + } + assertEquals(error, errorReader.closeException) + } + } + + @Test + fun isClosedFalseWhileOpen() = runTest { + val openChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val openReader = createReader(openChannel) + + assertFalse(openReader.isClosed) + } + + @Test + fun isClosedFalseWhenClosedWithBufferedData() = runTest { + val bufferedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val bufferedReader = createReader(bufferedChannel) + bufferedChannel.trySend(ByteArray(1) { 0 }) + bufferedChannel.trySend(ByteArray(1) { 1 }) + bufferedChannel.close() + + assertFalse(bufferedReader.isClosed) + } + + @Test + fun isClosedTrueAfterReadAll() = runTest { + val drainedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val drainedReader = createReader(drainedChannel) + drainedChannel.trySend(ByteArray(1) { 0 }) + drainedChannel.close() + + runBlocking { + drainedReader.readAll() + } + + assertTrue(drainedReader.isClosed) + } + + @Test + fun isClosedTrueAfterReadNextDrainsChannel() = runTest { + val drainedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val drainedReader = createReader(drainedChannel) + drainedChannel.trySend(ByteArray(1) { 0 }) + drainedChannel.close() + + runBlocking { + drainedReader.readNext() + } + + assertTrue(drainedReader.isClosed) + } + + @Test + fun isClosedTrueWhenClosedWithNoData() = runTest { + val emptyChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val emptyReader = createReader(emptyChannel) + emptyChannel.close() + + assertTrue(emptyReader.isClosed) + } + + @Test + fun isClosedTrueAfterReadAllDespiteAbnormalClose() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val errorReader = createReader(errorChannel) + errorChannel.trySend(ByteArray(1) { 0 }) + errorChannel.close(StreamException.AbnormalEndException("reason")) + + assertFalse(errorReader.isClosed) + + runBlocking { + errorReader.readAll() + } + + assertTrue(errorReader.isClosed) + assertTrue(errorReader.closeException is StreamException.AbnormalEndException) + } + + @Test + fun closeExceptionNullWhileOpen() = runTest { + val openChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val openReader = createReader(openChannel) + + assertNull(openReader.closeException) + assertFalse(openReader.isClosed) + } + + @Test + fun closeExceptionNullOnNormalClose() = runTest { + val normalChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val normalReader = createReader(normalChannel) + normalChannel.trySend(ByteArray(1) { 0 }) + normalChannel.close() + + assertNull(normalReader.closeException) + } + + @Test + fun closeExceptionSetOnAbnormalClose() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val errorReader = createReader(errorChannel) + val error = StreamException.LengthExceededException() + errorChannel.close(error) + + assertEquals(error, errorReader.closeException) + } + + @Test + fun closeExceptionSetWhenChannelAlreadyClosed() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val error = StreamException.TerminatedException() + errorChannel.close(error) + val errorReader = createReader(errorChannel) + + assertEquals(error, errorReader.closeException) + } + + @Test + fun closeExceptionAvailableBeforeDrain() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + val errorReader = createReader(errorChannel) + errorChannel.trySend(ByteArray(1) { 0 }) + errorChannel.trySend(ByteArray(1) { 1 }) + val error = StreamException.IncompleteException() + errorChannel.close(error) + + assertEquals(error, errorReader.closeException) + assertFalse(errorReader.isClosed) + + runBlocking { + assertEquals(2, errorReader.readAll().size) + } + assertEquals(error, errorReader.closeException) + assertTrue(errorReader.isClosed) } @Test @@ -104,4 +264,20 @@ class StreamReaderTest : BaseTest() { assertTrue(threwOnce) } + + private fun createReader(channel: Channel): ByteStreamReceiver { + return ByteStreamReceiver( + ByteStreamInfo( + id = "id", + topic = "topic", + timestampMs = 3, + totalSize = null, + attributes = mapOf(), + mimeType = "mime", + name = null, + encryptionType = LivekitModels.Encryption.Type.NONE, + ), + channel, + ) + } } From 04018bf7aa6e6ba4b68527da53c7bb362a5c5828 Mon Sep 17 00:00:00 2001 From: davidliu Date: Fri, 12 Jun 2026 18:21:02 +0900 Subject: [PATCH 3/5] Revert "Fix BaseStreamReceiver.readAll leaking thrown exceptions" This reverts commit 9ce4625351dec638e8e199fe1aa2a7f93ff0f5a1. --- .changeset/early-beers-enjoy.md | 5 - .changeset/selfish-drinks-play.md | 5 - .../datastream/incoming/BaseStreamReceiver.kt | 34 +--- .../room/datastream/StreamReaderTest.kt | 178 +----------------- 4 files changed, 4 insertions(+), 218 deletions(-) delete mode 100644 .changeset/early-beers-enjoy.md delete mode 100644 .changeset/selfish-drinks-play.md diff --git a/.changeset/early-beers-enjoy.md b/.changeset/early-beers-enjoy.md deleted file mode 100644 index f6ddd1df8..000000000 --- a/.changeset/early-beers-enjoy.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"client-sdk-android": patch ---- - -Add `closeException` and `isClosed` helpers to `BaseStreamReceiver` diff --git a/.changeset/selfish-drinks-play.md b/.changeset/selfish-drinks-play.md deleted file mode 100644 index 2d14ac4bf..000000000 --- a/.changeset/selfish-drinks-play.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"client-sdk-android": patch ---- - -Fix `BaseStreamReceiver.readAll` leaking thrown exceptions diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt index c7dbb71e3..fa9527283 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt @@ -17,7 +17,6 @@ package io.livekit.android.room.datastream.incoming import io.livekit.android.room.datastream.StreamException -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch @@ -33,34 +32,6 @@ import kotlinx.coroutines.flow.fold */ abstract class BaseStreamReceiver(private val source: Channel) { - private var closeCause: Throwable? = null - - init { - @OptIn(ExperimentalCoroutinesApi::class) - source.invokeOnClose { cause -> - closeCause = cause - } - } - - /** - * The [StreamException] this stream was closed with, or `null` if it closed normally or is still open. - * - * Note: Buffered data may still be available even if this is set. - * - * @see [isClosed] - */ - val closeException: StreamException? - get() = closeCause as? StreamException - - /** - * True if the stream is closed and all buffered data has been drained. - * - * @see [closeException] - */ - @OptIn(ExperimentalCoroutinesApi::class) - val isClosed: Boolean - get() = source.isClosedForReceive - /** * A [Flow] of stream data as it arrives. * @@ -102,10 +73,11 @@ abstract class BaseStreamReceiver(private val source: Channel) { * Suspends and waits for all available data until the stream is closed. * * [StreamException]s are swallowed; this returns all data received before the stream closed, - * whether normally or abnormally. Use [closeException] to check the cause of the stream closure. + * whether normally or abnormally. */ suspend fun readAll(): List { - return flow.catch { }.fold(mutableListOf()) { acc, value -> + flow.catch { } + return flow.fold(mutableListOf()) { acc, value -> acc.add(value) return@fold acc } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt index 5781fab80..6b2fcff26 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025-2026 LiveKit, Inc. + * Copyright 2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking import livekit.LivekitModels import org.junit.Assert.assertEquals -import org.junit.Assert.assertFalse -import org.junit.Assert.assertNull import org.junit.Assert.assertTrue import org.junit.Before import org.junit.Test @@ -58,7 +56,6 @@ class StreamReaderTest : BaseTest() { @Test fun buffersDataUntilSubscribed() = runTest { - assertFalse(reader.isClosed) var count = 0 runBlocking { reader.flow.collect { @@ -68,7 +65,6 @@ class StreamReaderTest : BaseTest() { } assertEquals(3, count) - assertTrue(reader.isClosed) } @Test @@ -83,8 +79,6 @@ class StreamReaderTest : BaseTest() { @Test fun readAll() = runTest { - assertNull(reader.closeException) - assertFalse(reader.isClosed) runBlocking { val data = reader.readAll() assertEquals(3, data.size) @@ -92,160 +86,6 @@ class StreamReaderTest : BaseTest() { assertEquals(i, data[i][0].toInt()) } } - assertNull(reader.closeException) - assertTrue(reader.isClosed) - } - - @Test - fun readAllSwallowsStreamException() = runTest { - val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val errorReader = createReader(errorChannel) - errorChannel.trySend(ByteArray(1) { 0 }) - errorChannel.trySend(ByteArray(1) { 1 }) - errorChannel.trySend(ByteArray(1) { 2 }) - val error = StreamException.AbnormalEndException("reason") - errorChannel.close(error) - - runBlocking { - assertEquals(error, errorReader.closeException) - val data = errorReader.readAll() - assertEquals(3, data.size) - for (i in 0..2) { - assertEquals(i, data[i][0].toInt()) - } - assertEquals(error, errorReader.closeException) - } - } - - @Test - fun isClosedFalseWhileOpen() = runTest { - val openChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val openReader = createReader(openChannel) - - assertFalse(openReader.isClosed) - } - - @Test - fun isClosedFalseWhenClosedWithBufferedData() = runTest { - val bufferedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val bufferedReader = createReader(bufferedChannel) - bufferedChannel.trySend(ByteArray(1) { 0 }) - bufferedChannel.trySend(ByteArray(1) { 1 }) - bufferedChannel.close() - - assertFalse(bufferedReader.isClosed) - } - - @Test - fun isClosedTrueAfterReadAll() = runTest { - val drainedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val drainedReader = createReader(drainedChannel) - drainedChannel.trySend(ByteArray(1) { 0 }) - drainedChannel.close() - - runBlocking { - drainedReader.readAll() - } - - assertTrue(drainedReader.isClosed) - } - - @Test - fun isClosedTrueAfterReadNextDrainsChannel() = runTest { - val drainedChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val drainedReader = createReader(drainedChannel) - drainedChannel.trySend(ByteArray(1) { 0 }) - drainedChannel.close() - - runBlocking { - drainedReader.readNext() - } - - assertTrue(drainedReader.isClosed) - } - - @Test - fun isClosedTrueWhenClosedWithNoData() = runTest { - val emptyChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val emptyReader = createReader(emptyChannel) - emptyChannel.close() - - assertTrue(emptyReader.isClosed) - } - - @Test - fun isClosedTrueAfterReadAllDespiteAbnormalClose() = runTest { - val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val errorReader = createReader(errorChannel) - errorChannel.trySend(ByteArray(1) { 0 }) - errorChannel.close(StreamException.AbnormalEndException("reason")) - - assertFalse(errorReader.isClosed) - - runBlocking { - errorReader.readAll() - } - - assertTrue(errorReader.isClosed) - assertTrue(errorReader.closeException is StreamException.AbnormalEndException) - } - - @Test - fun closeExceptionNullWhileOpen() = runTest { - val openChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val openReader = createReader(openChannel) - - assertNull(openReader.closeException) - assertFalse(openReader.isClosed) - } - - @Test - fun closeExceptionNullOnNormalClose() = runTest { - val normalChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val normalReader = createReader(normalChannel) - normalChannel.trySend(ByteArray(1) { 0 }) - normalChannel.close() - - assertNull(normalReader.closeException) - } - - @Test - fun closeExceptionSetOnAbnormalClose() = runTest { - val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val errorReader = createReader(errorChannel) - val error = StreamException.LengthExceededException() - errorChannel.close(error) - - assertEquals(error, errorReader.closeException) - } - - @Test - fun closeExceptionSetWhenChannelAlreadyClosed() = runTest { - val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val error = StreamException.TerminatedException() - errorChannel.close(error) - val errorReader = createReader(errorChannel) - - assertEquals(error, errorReader.closeException) - } - - @Test - fun closeExceptionAvailableBeforeDrain() = runTest { - val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() - val errorReader = createReader(errorChannel) - errorChannel.trySend(ByteArray(1) { 0 }) - errorChannel.trySend(ByteArray(1) { 1 }) - val error = StreamException.IncompleteException() - errorChannel.close(error) - - assertEquals(error, errorReader.closeException) - assertFalse(errorReader.isClosed) - - runBlocking { - assertEquals(2, errorReader.readAll().size) - } - assertEquals(error, errorReader.closeException) - assertTrue(errorReader.isClosed) } @Test @@ -264,20 +104,4 @@ class StreamReaderTest : BaseTest() { assertTrue(threwOnce) } - - private fun createReader(channel: Channel): ByteStreamReceiver { - return ByteStreamReceiver( - ByteStreamInfo( - id = "id", - topic = "topic", - timestampMs = 3, - totalSize = null, - attributes = mapOf(), - mimeType = "mime", - name = null, - encryptionType = LivekitModels.Encryption.Type.NONE, - ), - channel, - ) - } } From 4bbf4cd851b4233620e3ecb28f41877139c6efa4 Mon Sep 17 00:00:00 2001 From: davidliu Date: Fri, 12 Jun 2026 18:29:47 +0900 Subject: [PATCH 4/5] Fix `BaseStreamReceiver.readAll` leaking thrown exceptions --- .changeset/tender-dolls-reflect.md | 5 +++ .../datastream/incoming/BaseStreamReceiver.kt | 7 +++-- .../room/datastream/StreamReaderTest.kt | 31 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 .changeset/tender-dolls-reflect.md diff --git a/.changeset/tender-dolls-reflect.md b/.changeset/tender-dolls-reflect.md new file mode 100644 index 000000000..2d14ac4bf --- /dev/null +++ b/.changeset/tender-dolls-reflect.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix `BaseStreamReceiver.readAll` leaking thrown exceptions diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt index fa9527283..2c72aaecb 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/incoming/BaseStreamReceiver.kt @@ -72,12 +72,13 @@ abstract class BaseStreamReceiver(private val source: Channel) { /** * Suspends and waits for all available data until the stream is closed. * - * [StreamException]s are swallowed; this returns all data received before the stream closed, + * Exceptions are swallowed; this returns all data received before the stream closed, * whether normally or abnormally. + * + * @return A list of all data received before the stream closed. */ suspend fun readAll(): List { - flow.catch { } - return flow.fold(mutableListOf()) { acc, value -> + return flow.catch { }.fold(mutableListOf()) { acc, value -> acc.add(value) return@fold acc } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt index 6b2fcff26..a3a3ec70c 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt @@ -16,6 +16,7 @@ package io.livekit.android.room.datastream +import io.livekit.android.room.datastream.StreamException import io.livekit.android.room.datastream.incoming.ByteStreamReceiver import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl import io.livekit.android.test.BaseTest @@ -88,6 +89,36 @@ class StreamReaderTest : BaseTest() { } } + @Test + fun readAllSwallowsStreamException() = runTest { + val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver() + errorChannel.trySend(ByteArray(1) { 0 }) + errorChannel.trySend(ByteArray(1) { 1 }) + errorChannel.trySend(ByteArray(1) { 2 }) + errorChannel.close(StreamException.AbnormalEndException("reason")) + val errorReader = ByteStreamReceiver( + ByteStreamInfo( + id = "id", + topic = "topic", + timestampMs = 3, + totalSize = null, + attributes = mapOf(), + mimeType = "mime", + name = null, + encryptionType = LivekitModels.Encryption.Type.NONE, + ), + errorChannel, + ) + + runBlocking { + val data = errorReader.readAll() + assertEquals(3, data.size) + for (i in 0..2) { + assertEquals(i, data[i][0].toInt()) + } + } + } + @Test fun overreadThrows() = runTest { var threwOnce = false From 9ec9e7b45ea850c9452f49640dbfa8be079eb97c Mon Sep 17 00:00:00 2001 From: davidliu Date: Fri, 12 Jun 2026 18:32:39 +0900 Subject: [PATCH 5/5] spotless --- .../io/livekit/android/room/datastream/StreamReaderTest.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt index a3a3ec70c..30fb984fa 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/datastream/StreamReaderTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ package io.livekit.android.room.datastream -import io.livekit.android.room.datastream.StreamException import io.livekit.android.room.datastream.incoming.ByteStreamReceiver import io.livekit.android.room.datastream.incoming.IncomingDataStreamManagerImpl import io.livekit.android.test.BaseTest