Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-tomatoes-grow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Clarified documentation regarding data stream receivers and errors
5 changes: 5 additions & 0 deletions .changeset/tender-dolls-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix `BaseStreamReceiver.readAll` leaking thrown exceptions
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,7 +63,7 @@ sealed class StreamException(message: String? = null) : Exception(message) {
class FileInfoUnavailableException : StreamException()

/**
*
* Encryption of the data chunks did not match the declared encryption type.
*/
class EncryptionTypeMismatch(message: String? = null) : StreamException(message)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,42 @@

package io.livekit.android.room.datastream.incoming

import io.livekit.android.room.datastream.StreamException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.fold

/**
* Base class for reading incoming data streams.
*
* @see [flow]
* @see [readNext]
* @see [readAll]
*/
abstract class BaseStreamReceiver<T>(private val source: Channel<ByteArray>) {

/**
* A [Flow] of stream data as it arrives.
*
* Collect this flow to process incoming data incrementally. The flow completes normally when
* the stream receives all the data without error. If the stream ends abnormally, the
* flow fails with a [StreamException] after all buffered chunks have been emitted.
*
* Example:
* ```
* reader.flow
* .catch { e ->
* if (e is StreamException) {
* handleStreamError(e)
* } else {
* throw e
* }
* }
* .collect { chunk -> process(chunk) }
* ```
*/
abstract val flow: Flow<T>

internal fun close(error: Exception?) {
Expand All @@ -34,18 +62,23 @@ abstract class BaseStreamReceiver<T>(private val source: Channel<ByteArray>) {
* Suspends and waits for the next piece of data.
*
* @return the next available piece of data.
* @throws NoSuchElementException when the stream is closed and no more data is available.
* @throws NoSuchElementException when the stream is closed normally and no more data is available.
* @throws StreamException when the stream is closed abnormally.
*/
suspend fun readNext(): T {
return flow.first()
}

/**
* Suspends and waits for all available data until the stream is closed.
*
* Exceptions are swallowed; this returns all data received before the stream closed,
* whether normally or abnormally.
*
* @return A list of all data received before the stream closed.
*/
suspend fun readAll(): List<T> {
flow.catch { }
return flow.fold(mutableListOf()) { acc, value ->
return flow.catch { }.fold(mutableListOf()) { acc, value ->
acc.add(value)
return@fold acc
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,12 +16,21 @@

package io.livekit.android.room.datastream.incoming

import io.livekit.android.room.Room
import io.livekit.android.room.datastream.ByteStreamInfo
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow

/**
* Receiver for an incoming byte stream.
*
* Provided to [ByteStreamHandler] callbacks registered through [Room].
*
* @see Room.registerByteStreamHandler
*/
class ByteStreamReceiver(
/** Metadata for this stream. */
val info: ByteStreamInfo,
channel: Channel<ByteArray>,
) : BaseStreamReceiver<ByteArray>(channel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,13 +16,23 @@

package io.livekit.android.room.datastream.incoming

import io.livekit.android.room.Room
import io.livekit.android.room.datastream.TextStreamInfo
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.receiveAsFlow

/**
* Receiver for an incoming text stream.
*
* Chunks are decoded as UTF-8 strings. Provided to [TextStreamHandler] callbacks registered
* through [Room].
*
* @see Room.registerTextStreamHandler
*/
class TextStreamReceiver(
/** Metadata for this stream. */
val info: TextStreamInfo,
source: Channel<ByteArray>,
) : BaseStreamReceiver<String>(source) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2025 LiveKit, Inc.
* Copyright 2025-2026 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -88,6 +88,36 @@ class StreamReaderTest : BaseTest() {
}
}

@Test
fun readAllSwallowsStreamException() = runTest {
val errorChannel = IncomingDataStreamManagerImpl.createChannelForStreamReceiver()
errorChannel.trySend(ByteArray(1) { 0 })
errorChannel.trySend(ByteArray(1) { 1 })
errorChannel.trySend(ByteArray(1) { 2 })
errorChannel.close(StreamException.AbnormalEndException("reason"))
val errorReader = ByteStreamReceiver(
ByteStreamInfo(
id = "id",
topic = "topic",
timestampMs = 3,
totalSize = null,
attributes = mapOf(),
mimeType = "mime",
name = null,
encryptionType = LivekitModels.Encryption.Type.NONE,
),
errorChannel,
)

runBlocking {
val data = errorReader.readAll()
assertEquals(3, data.size)
for (i in 0..2) {
assertEquals(i, data[i][0].toInt())
}
}
}

@Test
fun overreadThrows() = runTest {
var threwOnce = false
Expand Down