diff --git a/.changeset/heavy-parrots-join.md b/.changeset/heavy-parrots-join.md new file mode 100644 index 000000000..8f544382a --- /dev/null +++ b/.changeset/heavy-parrots-join.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Change proguard rule for protobufs to official recommended rule, allowing unused protobuf classes to be removed with minification diff --git a/.changeset/wacky-turtles-thank.md b/.changeset/wacky-turtles-thank.md new file mode 100644 index 000000000..07bfa415f --- /dev/null +++ b/.changeset/wacky-turtles-thank.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Add support for RPC V2 diff --git a/gradle/livekit-protobuf.gradle b/gradle/livekit-protobuf.gradle new file mode 100644 index 000000000..9310e7e48 --- /dev/null +++ b/gradle/livekit-protobuf.gradle @@ -0,0 +1,72 @@ +// Shared protobuf setup for LiveKit client protos. +// +// Configure before applying: +// ext.livekitProtoIncludes = ['livekit_models.proto', ...] +// Optional (for modules that compile a subset but import others): +// ext.livekitProtoImportSrc = true + +if (!project.ext.has('livekitProtoIncludes')) { + throw new GradleException("ext.livekitProtoIncludes must be set before applying livekit-protobuf.gradle") +} + +def stagedProtoSrcDir = layout.buildDirectory.dir("staged-proto-src") +def stageProtoSources = tasks.register("stageProtoSources", Copy) { + from(generated.protoSrc) { + include project.ext.livekitProtoIncludes as String[] + } + into stagedProtoSrcDir +} + +android.sourceSets.main.proto { + srcDir stagedProtoSrcDir +} + +android.sourceSets.main.java { + srcDir "${protobuf.generatedFilesBaseDir}/main/javalite" +} + +configurations { + descriptorProtoSource +} + +dependencies { + descriptorProtoSource "com.google.protobuf:protobuf-java:${libs.versions.protobuf.get()}" +} + +def extractedImportProtosDir = layout.buildDirectory.dir("extracted-import-protos") +def extractProtoImports = tasks.register("extractProtoImports", Copy) { + into extractedImportProtosDir + from { zipTree(configurations.descriptorProtoSource.singleFile) } { + include "google/protobuf/descriptor.proto" + } + from(generated.protoSrc) { + include "logger/**" + } +} + +protobuf { + protoc { + // for apple m1, please add protoc_platform=osx-x86_64 in $HOME/.gradle/gradle.properties + if (project.hasProperty('protoc_platform')) { + artifact = "com.google.protobuf:protoc:${libs.versions.protobuf.get()}:${protoc_platform}" + } else { + artifact = "com.google.protobuf:protoc:${libs.versions.protobuf.get()}" + } + } + + generateProtoTasks { + all().each { task -> + task.dependsOn stageProtoSources + task.dependsOn extractProtoImports + task.addIncludeDir files(extractedImportProtosDir) + if (project.ext.has('livekitProtoImportSrc') && project.ext.livekitProtoImportSrc) { + task.addIncludeDir files(generated.protoSrc) + } + task.builtins { + java { + option "lite" + } + } + } + } +} diff --git a/livekit-android-sdk/build.gradle b/livekit-android-sdk/build.gradle index 706ffa27d..dbc69f736 100644 --- a/livekit-android-sdk/build.gradle +++ b/livekit-android-sdk/build.gradle @@ -30,18 +30,6 @@ android { } } - sourceSets { - main { - proto { - srcDir generated.protoSrc - exclude '*/*.proto' // only use top-level protos. - } - java { - srcDir "${protobuf.generatedFilesBaseDir}/main/javalite" - } - } - } - testOptions { unitTests { includeAndroidResources = true @@ -74,26 +62,12 @@ android { } -protobuf { - protoc { - // for apple m1, please add protoc_platform=osx-x86_64 in $HOME/.gradle/gradle.properties - if (project.hasProperty('protoc_platform')) { - artifact = "com.google.protobuf:protoc:${libs.versions.protobuf.get()}:${protoc_platform}" - } else { - artifact = "com.google.protobuf:protoc:${libs.versions.protobuf.get()}" - } - } - - generateProtoTasks { - all().each { task -> - task.builtins { - java { - option "lite" - } - } - } - } -} +ext.livekitProtoIncludes = [ + 'livekit_models.proto', + 'livekit_rtc.proto', + 'livekit_metrics.proto', +] +apply from: rootProject.file('gradle/livekit-protobuf.gradle') jacoco { toolVersion = "0.8.14" diff --git a/livekit-android-sdk/consumer-rules.pro b/livekit-android-sdk/consumer-rules.pro index 7808d2f90..f5c4cd720 100644 --- a/livekit-android-sdk/consumer-rules.pro +++ b/livekit-android-sdk/consumer-rules.pro @@ -42,4 +42,6 @@ # Protobuf ######################################### --keep class * extends com.google.protobuf.GeneratedMessageLite { *; } +-keepclassmembers class * extends com.google.protobuf.GeneratedMessageLite { + ; +} diff --git a/livekit-android-sdk/detekt-baseline-release.xml b/livekit-android-sdk/detekt-baseline-release.xml index 913ef5b22..418e8ea50 100644 --- a/livekit-android-sdk/detekt-baseline-release.xml +++ b/livekit-android-sdk/detekt-baseline-release.xml @@ -31,31 +31,19 @@ EmptyFunctionBlock:RTCEngine.kt$RTCEngine${ } HasPlatformType:DataChannelManager.kt$DataChannelManager$@get:FlowObservable var state by flowDelegate(dataChannel.state()) private set IgnoredReturnValue:BaseStreamReceiver.kt$BaseStreamReceiver$catch { } - InstanceOfCheckForException:LocalParticipant.kt$LocalParticipant$e is RpcError + IgnoredReturnValue:RpcServerManager.kt$RpcServerManager$publishRpcAck(callerIdentity, requestId) + InstanceOfCheckForException:RpcServerManager.kt$RpcServerManager$e is RpcError LargeClass:LocalParticipant.kt$LocalParticipant : ParticipantOutgoingDataStreamManagerRpcManager LargeClass:RTCEngine.kt$RTCEngine : Listener LargeClass:Room.kt$Room : ListenerParticipantListenerRpcManagerIncomingDataStreamManager LargeClass:SignalClient.kt$SignalClient : WebSocketListener - LongMethod:LocalParticipant.kt$LocalParticipant$@Throws(TrackException.PublishException::class) private suspend fun publishTrackImpl( track: Track, options: TrackPublishOptions, requestConfig: AddTrackRequest.Builder.() -> Unit, encodings: List<RtpParameters.Encoding> = emptyList(), publishListener: PublishListener? = null, ): LocalTrackPublication? - LongMethod:LocalParticipant.kt$LocalParticipant$override suspend fun performRpc( destinationIdentity: Identity, method: String, payload: String, responseTimeout: Duration, ): String - LongMethod:LocalParticipant.kt$LocalParticipant$private fun publishAdditionalCodecForTrack(track: LocalVideoTrack, codec: VideoCodec, options: VideoTrackPublishOptions) - LongMethod:LocalParticipant.kt$LocalParticipant$private suspend fun handleIncomingRpcRequest( callerIdentity: Identity, requestId: String, method: String, payload: String, responseTimeout: Duration, version: Int, ) - LongMethod:LocalParticipant.kt$LocalParticipant$private suspend fun setTrackEnabled( source: Track.Source, enabled: Boolean, screenCaptureParams: ScreenCaptureParams? = null, ): Boolean - LongMethod:LocalParticipant.kt$LocalParticipant$suspend fun publishVideoTrack( track: LocalVideoTrack, options: VideoTrackPublishOptions = VideoTrackPublishOptions( null, if (track.options.isScreencast) screenShareTrackPublishDefaults else videoTrackPublishDefaults, ), publishListener: PublishListener? = null, ): Boolean - LongMethod:PeerConnectionTransport.kt$PeerConnectionTransport$private suspend fun createAndSendOffer(constraints: MediaConstraints = MediaConstraints()) LongMethod:RTCEngine.kt$RTCEngine$@Synchronized @VisibleForTesting(otherwise = VisibleForTesting.PACKAGE_PRIVATE) fun reconnect() - LongMethod:RTCEngine.kt$RTCEngine$fun onMessage(dataChannel: DataChannel, buffer: DataChannel.Buffer?) - LongMethod:RTCEngine.kt$RTCEngine$private suspend fun configure(joinResponse: JoinResponse, connectOptions: ConnectOptions) - LongMethod:Room.kt$Room$@Synchronized private fun getOrCreateRemoteParticipant( identity: Participant.Identity, info: LivekitModels.ParticipantInfo, ): RemoteParticipant LongMethod:Room.kt$Room$@Throws(Exception::class) suspend fun connect(url: String, token: String, options: ConnectOptions = ConnectOptions()) - LongMethod:Room.kt$Room$private fun setupLocalParticipantEventHandling() - LongMethod:SignalClient.kt$SignalClient$private fun handleSignalResponseImpl(ws: WebSocket, response: LivekitRtc.SignalResponse) LongParameterList:AudioBufferCallbackDispatcher.kt$AudioBufferCallback$(buffer: ByteBuffer, audioFormat: Int, channelCount: Int, sampleRate: Int, bytesRead: Int, captureTimeNs: Long) LongParameterList:KeyProvider.kt$BaseKeyProvider$( ratchetSalt: String = defaultRatchetSalt, uncryptedMagicBytes: String = defaultMagicBytes, ratchetWindowSize: Int = defaultRatchetWindowSize, override var enableSharedKey: Boolean = true, failureTolerance: Int = defaultFailureTolerance, keyRingSize: Int = defaultKeyRingSize, discardFrameWhenCryptorNotReady: Boolean = defaultDiscardFrameWhenCryptorNotReady, keyDerivationAlgorithm: FrameCryptorKeyDerivationAlgorithm = defaultKeyDerivationAlgorithm, ) LongParameterList:LiveKitOverrides.kt$AudioOptions$( /** * Override the default output [AudioType]. * * This affects the audio routing and how the audio is handled. Default is [AudioType.CallAudioType]. * * Note: if [audioHandler] is also passed, the values from [audioOutputType] will not be reflected in it, * and must be set yourself. */ val audioOutputType: AudioType? = null, /** * Override the default [AudioHandler]. * * Default is [AudioSwitchHandler]. * * Use [NoAudioHandler] to turn off automatic audio handling or * [AudioFocusHandler] to get simple audio focus handling. */ val audioHandler: AudioHandler? = null, /** * Override the default [AudioDeviceModule]. * * If a non-null value is passed, the library does not * take ownership of the object and will not release it upon [Room.release]. * It is the responsibility of the owner to call [AudioDeviceModule.release] when finished * with it to prevent memory leaks. */ val audioDeviceModule: AudioDeviceModule? = null, /** * Called after default setup to allow for customizations on the [JavaAudioDeviceModule]. * * Not used if [audioDeviceModule] is provided. * * Note: We require setting the [JavaAudioDeviceModule.Builder.setSamplesReadyCallback] to provide * support for [LocalAudioTrack.addSink]. If you wish to grab the audio samples * from the local microphone track, use [LocalAudioTrack.addSink] instead of setting your own * callback. */ val javaAudioDeviceModuleCustomizer: ((builder: JavaAudioDeviceModule.Builder) -> Unit)? = null, /** * On Android 11+, the audio mode will reset itself from [AudioManager.MODE_IN_COMMUNICATION] if * there is no audio playback or capture for 6 seconds (for example when joining a room with * no speakers and the local mic is muted.) This mode reset will cause unexpected * behavior when trying to change the volume, causing it to not properly change the volume. * * We use a workaround by playing a silent audio track to keep the communication mode from * resetting. * * Setting this flag to true will disable the workaround. * * This flag is a no-op when the audio mode is set to anything other than * [AudioManager.MODE_IN_COMMUNICATION]. */ val disableCommunicationModeWorkaround: Boolean = false, /** * Options for processing the mic and incoming audio. */ val audioProcessorOptions: AudioProcessorOptions? = null, /** * Devices may take some time initializing the audio stack for recording. * Prewarming allows starting up the underlying audio recording prior to publish, letting * the audio device be ready immediately when the track is fully published. * * If set to true, disables audio recording prewarming (and the related * [LocalAudioTrack.prewarm] function), and audio resources are only used while the * track is connected and published. Defaults to false. */ val disableAudioPrewarming: Boolean = false, ) LongParameterList:LocalAudioTrack.kt$LocalAudioTrack$( @Assisted name: String, @Assisted mediaTrack: livekit.org.webrtc.AudioTrack, @Assisted private val options: LocalAudioTrackOptions, private val audioProcessingController: AudioProcessingController, @Named(InjectionNames.DISPATCHER_DEFAULT) private val dispatcher: CoroutineDispatcher, @Named(InjectionNames.LOCAL_AUDIO_RECORD_SAMPLES_DISPATCHER) private val audioRecordSamplesDispatcher: AudioRecordSamplesDispatcher, @Named(InjectionNames.LOCAL_AUDIO_BUFFER_CALLBACK_DISPATCHER) private val audioBufferCallbackDispatcher: AudioBufferCallbackDispatcher, private val audioRecordPrewarmer: AudioRecordPrewarmer, rtcThreadToken: RTCThreadToken, ) - LongParameterList:LocalParticipant.kt$LocalParticipant$( @Assisted internal var dynacast: Boolean, internal val engine: RTCEngine, private val peerConnectionFactory: PeerConnectionFactory, private val context: Context, private val eglBase: EglBase, private val screencastVideoTrackFactory: LocalScreencastVideoTrack.Factory, private val videoTrackFactory: LocalVideoTrack.Factory, private val audioTrackFactory: LocalAudioTrack.Factory, private val defaultsManager: DefaultsManager, @Named(InjectionNames.DISPATCHER_DEFAULT) coroutineDispatcher: CoroutineDispatcher, @Named(InjectionNames.SENDER) private val capabilitiesGetter: CapabilitiesGetter, private val outgoingDataStreamManager: OutgoingDataStreamManager, ) - LongParameterList:LocalParticipant.kt$LocalParticipant$( callerIdentity: Identity, requestId: String, method: String, payload: String, responseTimeout: Duration, version: Int, ) + LongParameterList:LocalParticipant.kt$LocalParticipant$( @Assisted internal var dynacast: Boolean, internal val engine: RTCEngine, private val peerConnectionFactory: PeerConnectionFactory, private val context: Context, private val eglBase: EglBase, private val screencastVideoTrackFactory: LocalScreencastVideoTrack.Factory, private val videoTrackFactory: LocalVideoTrack.Factory, private val audioTrackFactory: LocalAudioTrack.Factory, private val defaultsManager: DefaultsManager, @Named(InjectionNames.DISPATCHER_DEFAULT) coroutineDispatcher: CoroutineDispatcher, @Named(InjectionNames.SENDER) private val capabilitiesGetter: CapabilitiesGetter, private val outgoingDataStreamManager: OutgoingDataStreamManager, private val rpcClientManager: RpcClientManager, private val rpcServerManager: RpcServerManager, ) LongParameterList:LocalScreencastVideoTrack.kt$LocalScreencastVideoTrack$( @Assisted capturer: VideoCapturer, @Assisted source: VideoSource, @Assisted name: String, @Assisted options: LocalVideoTrackOptions, @Assisted rtcTrack: livekit.org.webrtc.VideoTrack, @Assisted mediaProjectionCallback: MediaProjectionCallback, peerConnectionFactory: PeerConnectionFactory, context: Context, eglBase: EglBase, defaultsManager: DefaultsManager, videoTrackFactory: LocalVideoTrack.Factory, rtcThreadToken: RTCThreadToken, ) LongParameterList:LocalScreencastVideoTrack.kt$LocalScreencastVideoTrack.Companion$( mediaProjectionPermissionResultData: Intent, peerConnectionFactory: PeerConnectionFactory, context: Context, name: String, options: LocalVideoTrackOptions, rootEglBase: EglBase, screencastVideoTrackFactory: Factory, videoProcessor: VideoProcessor?, onStop: (Track) -> Unit, ) LongParameterList:LocalScreencastVideoTrack.kt$LocalScreencastVideoTrack.Factory$( capturer: VideoCapturer, source: VideoSource, name: String, options: LocalVideoTrackOptions, rtcTrack: livekit.org.webrtc.VideoTrack, mediaProjectionCallback: MediaProjectionCallback, ) @@ -71,7 +59,7 @@ LongParameterList:RTCStatsExt.kt$( trackIdentifier: String, ssrcs: Set<Long?>, codecIds: Set<String?>, localCandidateId: String?, remoteCandidateId: String?, statsMap: Map<String, RTCStats>, ) LongParameterList:RemoteParticipant.kt$RemoteParticipant$( mediaTrack: MediaStreamTrack, sid: String, statsGetter: RTCStatsGetter, receiver: RtpReceiver, autoManageVideo: Boolean = false, triesLeft: Int = 20, ) LongParameterList:RemoteParticipant.kt$RemoteParticipant$( sid: Sid, identity: Identity? = null, internal val signalClient: SignalClient, private val ioDispatcher: CoroutineDispatcher, defaultDispatcher: CoroutineDispatcher, private val audioTrackFactory: RemoteAudioTrack.Factory, private val videoTrackFactory: RemoteVideoTrack.Factory, ) - LongParameterList:Room.kt$Room$( @Assisted private val context: Context, internal val engine: RTCEngine, private val eglBase: EglBase, localParticipantFactory: LocalParticipant.Factory, private val defaultsManager: DefaultsManager, @Named(InjectionNames.DISPATCHER_DEFAULT) private val defaultDispatcher: CoroutineDispatcher, @Named(InjectionNames.DISPATCHER_IO) private val ioDispatcher: CoroutineDispatcher, /** * The [AudioHandler] for setting up the audio as need. * * By default, this is an instance of [AudioSwitchHandler]. * * This can be substituted for your own custom implementation through * [LiveKitOverrides.audioOptions] when creating the room with [LiveKit.create]. * * @see [audioSwitchHandler] * @see [AudioSwitchHandler] */ val audioHandler: AudioHandler, private val closeableManager: CloseableManager, private val e2EEManagerFactory: E2EEManager.Factory, private val communicationWorkaround: CommunicationWorkaround, val audioProcessingController: AudioProcessingController, /** * A holder for objects that are used internally within LiveKit. */ val lkObjects: LKObjects, networkCallbackManagerFactory: NetworkCallbackManagerFactory, private val audioDeviceModule: AudioDeviceModule, private val regionUrlProviderFactory: RegionUrlProvider.Factory, private val connectionWarmer: ConnectionWarmer, private val audioRecordPrewarmer: AudioRecordPrewarmer, private val incomingDataStreamManager: IncomingDataStreamManager, private val remoteParticipantFactory: RemoteParticipant.Factory, ) + LongParameterList:Room.kt$Room$( @Assisted private val context: Context, internal val engine: RTCEngine, private val eglBase: EglBase, localParticipantFactory: LocalParticipant.Factory, private val defaultsManager: DefaultsManager, @Named(InjectionNames.DISPATCHER_DEFAULT) private val defaultDispatcher: CoroutineDispatcher, @Named(InjectionNames.DISPATCHER_IO) private val ioDispatcher: CoroutineDispatcher, /** * The [AudioHandler] for setting up the audio as need. * * By default, this is an instance of [AudioSwitchHandler]. * * This can be substituted for your own custom implementation through * [LiveKitOverrides.audioOptions] when creating the room with [LiveKit.create]. * * @see [audioSwitchHandler] * @see [AudioSwitchHandler] */ val audioHandler: AudioHandler, private val closeableManager: CloseableManager, private val e2EEManagerFactory: E2EEManager.Factory, private val communicationWorkaround: CommunicationWorkaround, val audioProcessingController: AudioProcessingController, /** * A holder for objects that are used internally within LiveKit. */ val lkObjects: LKObjects, networkCallbackManagerFactory: NetworkCallbackManagerFactory, private val audioDeviceModule: AudioDeviceModule, private val regionUrlProviderFactory: RegionUrlProvider.Factory, private val connectionWarmer: ConnectionWarmer, private val audioRecordPrewarmer: AudioRecordPrewarmer, private val incomingDataStreamManager: IncomingDataStreamManager, private val rpcClientManager: RpcClientManager, private val rpcServerManager: RpcServerManager, private val remoteParticipantFactory: RemoteParticipant.Factory, ) MapGetWithNotNullAssertionOperator:LocalParticipant.kt$LocalParticipant$sourcePubLocks[source]!! NestedBlockDepth:ByteStreamSender.kt$@CheckResult suspend fun ByteStreamSender.write(source: Source): Result<Unit> NestedBlockDepth:LocalParticipant.kt$LocalParticipant$@Throws(TrackException.PublishException::class) private suspend fun publishTrackImpl( track: Track, options: TrackPublishOptions, requestConfig: AddTrackRequest.Builder.() -> Unit, encodings: List<RtpParameters.Encoding> = emptyList(), publishListener: PublishListener? = null, ): LocalTrackPublication? @@ -103,6 +91,7 @@ TooManyFunctions:RTCMetricsManager.kt$io.livekit.android.room.metrics.RTCMetricsManager.kt TooManyFunctions:RTCModule.kt$RTCModule TooManyFunctions:Room.kt$Room : ListenerParticipantListenerRpcManagerIncomingDataStreamManager + TooManyFunctions:RpcServerManager.kt$RpcServerManager TooManyFunctions:SignalClient.kt$SignalClient : WebSocketListener TooManyFunctions:SignalClient.kt$SignalClient$Listener TooManyFunctions:SimulcastVideoEncoderFactoryWrapper.kt$SimulcastVideoEncoderFactoryWrapper$StreamEncoderWrapper : VideoEncoder diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/ConnectOptions.kt b/livekit-android-sdk/src/main/java/io/livekit/android/ConnectOptions.kt index ef87f12ca..b7fe11b86 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/ConnectOptions.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/ConnectOptions.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package io.livekit.android +import io.livekit.android.room.ClientProtocolVersion import io.livekit.android.room.ProtocolVersion import io.livekit.android.room.Room import livekit.org.webrtc.PeerConnection @@ -53,6 +54,13 @@ data class ConnectOptions( * the protocol version to use with the server. */ val protocolVersion: ProtocolVersion = ProtocolVersion.v13, + + /** + * The client protocol version to advertise to other participants in the room + * for peer-to-peer feature negotiation (RPC v2, etc.). Defaults to the latest + * version supported by this SDK build. + */ + val clientProtocol: ClientProtocolVersion = ClientProtocolVersion.DATA_STREAM_RPC, ) { internal var reconnect: Boolean = false internal var participantSid: String? = null diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/events/RoomEvent.kt b/livekit-android-sdk/src/main/java/io/livekit/android/events/RoomEvent.kt index 0181b0cb6..4f2ca30b5 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/events/RoomEvent.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/events/RoomEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -328,6 +328,7 @@ fun LivekitModels.DisconnectReason?.convert(): DisconnectReason { LivekitModels.DisconnectReason.SIP_TRUNK_FAILURE -> DisconnectReason.SIP_TRUNK_FAILURE LivekitModels.DisconnectReason.CONNECTION_TIMEOUT -> DisconnectReason.CONNECTION_TIMEOUT LivekitModels.DisconnectReason.MEDIA_FAILURE -> DisconnectReason.MEDIA_FAILURE + LivekitModels.DisconnectReason.AGENT_ERROR, LivekitModels.DisconnectReason.UNKNOWN_REASON, LivekitModels.DisconnectReason.UNRECOGNIZED, null, diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index f79eb9688..0b8b1f6d1 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -61,7 +61,11 @@ import io.livekit.android.room.participant.RpcHandler import io.livekit.android.room.participant.VideoTrackPublishDefaults import io.livekit.android.room.participant.publishTracksInfo import io.livekit.android.room.provisions.LKObjects +import io.livekit.android.room.rpc.RPC_REQUEST_DATA_STREAM_TOPIC +import io.livekit.android.room.rpc.RPC_RESPONSE_DATA_STREAM_TOPIC +import io.livekit.android.room.rpc.RpcClientManager import io.livekit.android.room.rpc.RpcManager +import io.livekit.android.room.rpc.RpcServerManager import io.livekit.android.room.track.LocalAudioTrackOptions import io.livekit.android.room.track.LocalTrackPublication import io.livekit.android.room.track.LocalVideoTrackOptions @@ -146,6 +150,8 @@ constructor( private val connectionWarmer: ConnectionWarmer, private val audioRecordPrewarmer: AudioRecordPrewarmer, private val incomingDataStreamManager: IncomingDataStreamManager, + private val rpcClientManager: RpcClientManager, + private val rpcServerManager: RpcServerManager, private val remoteParticipantFactory: RemoteParticipant.Factory, ) : RTCEngine.Listener, ParticipantListener, RpcManager, IncomingDataStreamManager by incomingDataStreamManager { @@ -155,6 +161,27 @@ constructor( init { engine.listener = this + + // Register SDK-internal text-stream handlers for the RPC v2 transport. These reserve + // the topics `lk.rpc_request` and `lk.rpc_response` from user-level handler registration. + incomingDataStreamManager.registerTextStreamHandler(RPC_REQUEST_DATA_STREAM_TOPIC) { receiver, fromIdentity -> + coroutineScope.launch { + rpcServerManager.handleIncomingDataStream(receiver, fromIdentity) + } + } + incomingDataStreamManager.registerTextStreamHandler(RPC_RESPONSE_DATA_STREAM_TOPIC) { receiver, fromIdentity -> + coroutineScope.launch { + rpcClientManager.handleIncomingDataStreamResponse(receiver, fromIdentity) + } + } + + // Wire each manager's clientProtocol lookup via the remote-participants store. + val getRemoteClientProtocol: (Participant.Identity) -> Int = { id -> + remoteParticipants[id]?.clientProtocol + ?: ClientProtocolVersion.DEFAULT.value + } + rpcClientManager.getRemoteClientProtocol = getRemoteClientProtocol + rpcServerManager.getRemoteClientProtocol = getRemoteClientProtocol } enum class State { @@ -454,7 +481,7 @@ constructor( roomOptions = getCurrentRoomOptions() // Setup local participant. - localParticipant.reinitialize() + localParticipant.reinitialize(options) setupLocalParticipantEventHandling() if (roomOptions.e2eeOptions != null) { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index 913481df8..bc9b50603 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -173,7 +173,7 @@ constructor( // Clean up any pre-existing connection. close(reason = "Starting new connection", shouldClearQueuedRequests = false) - val wsUrlString = "${url.toWebsocketUrl()}/rtc${createConnectionParams(getClientInfo(), options, roomOptions)}" + val wsUrlString = "${url.toWebsocketUrl()}/rtc${createConnectionParams(getClientInfo(options.clientProtocol), options, roomOptions)}" isReconnecting = options.reconnect LKLog.i { "connecting to $wsUrlString" } @@ -240,6 +240,7 @@ constructor( addParam(CONNECT_QUERY_OS, clientInfo.os) addParam(CONNECT_QUERY_OS_VERSION, clientInfo.osVersion) addParam(CONNECT_QUERY_NETWORK_TYPE, networkInfo.getNetworkType().protoName) + addParam(CONNECT_QUERY_CLIENT_PROTOCOL, options.clientProtocol.value.toString()) return queryBuilder.toString() } @@ -856,6 +857,18 @@ constructor( LivekitRtc.SignalResponse.MessageCase.SUBSCRIBED_AUDIO_CODEC_UPDATE -> { // TODO } + + LivekitRtc.SignalResponse.MessageCase.PUBLISH_DATA_TRACK_RESPONSE -> { + // TODO + } + + LivekitRtc.SignalResponse.MessageCase.UNPUBLISH_DATA_TRACK_RESPONSE -> { + // TODO + } + + LivekitRtc.SignalResponse.MessageCase.DATA_TRACK_SUBSCRIBER_HANDLES -> { + // TODO + } } } @@ -959,6 +972,7 @@ constructor( const val CONNECT_QUERY_OS_VERSION = "os_version" const val CONNECT_QUERY_NETWORK_TYPE = "network" const val CONNECT_QUERY_PARTICIPANT_SID = "sid" + const val CONNECT_QUERY_CLIENT_PROTOCOL = "client_protocol" const val SD_TYPE_ANSWER = "answer" const val SD_TYPE_OFFER = "offer" @@ -1012,6 +1026,27 @@ enum class ProtocolVersion(val value: Int) { v13(13), } +/** + * The protocol version this SDK advertises to **peers** (other participants) for + * client-to-client feature negotiation (RPC v2, etc.). Distinct from [ProtocolVersion], + * which tracks the signaling protocol between client and server. + * + * Sent to the server during the join handshake via the `client_protocol` connection + * query parameter and `ClientInfo.client_protocol`; the server then populates + * `ParticipantInfo.client_protocol` for other peers in the room to read. + */ +@Suppress("unused") +enum class ClientProtocolVersion(val value: Int) { + /** Initial client protocol. RPC v1 only (15 KB packet payload limit). */ + DEFAULT(0), + + /** + * RPC v2: request and success-response payloads are carried over text data streams + * instead of inline packets, lifting the 15 KB payload limit. + */ + DATA_STREAM_RPC(1), +} + class ServerInfo( val edition: Edition, val version: Semver?, diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt index 9463bf1f6..c8b4db607 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/datastream/outgoing/BaseStreamSender.kt @@ -72,6 +72,11 @@ typealias DataChunker = (data: T, chunkSize: Int) -> List * On success, [block] should still attempt to close [sender] when the stream is * finished normally. If it is left open, any exceptions thrown by [sender.close] * will be ignored. + * + * Any exceptions thrown within [block] will be caught and returned in the result. + * + * @return A successful [Result] object containing the return value of [block], or + * a failure if any exceptions were thrown. */ @CheckResult suspend inline fun , R> useStreamSender( diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt index 259b6fd01..5daa840db 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt @@ -22,14 +22,15 @@ import android.content.Intent import androidx.annotation.CheckResult import androidx.annotation.VisibleForTesting import com.google.protobuf.ByteString -import com.vdurmont.semver4j.Semver import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject +import io.livekit.android.ConnectOptions import io.livekit.android.audio.ScreenAudioCapturer import io.livekit.android.dagger.CapabilitiesGetter import io.livekit.android.dagger.InjectionNames import io.livekit.android.events.ParticipantEvent +import io.livekit.android.room.ClientProtocolVersion import io.livekit.android.room.ConnectionState import io.livekit.android.room.DefaultsManager import io.livekit.android.room.RTCEngine @@ -37,7 +38,9 @@ import io.livekit.android.room.Room import io.livekit.android.room.TrackBitrateInfo import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager import io.livekit.android.room.isSVCCodec +import io.livekit.android.room.rpc.RpcClientManager import io.livekit.android.room.rpc.RpcManager +import io.livekit.android.room.rpc.RpcServerManager import io.livekit.android.room.track.DataPublishReliability import io.livekit.android.room.track.LocalAudioTrack import io.livekit.android.room.track.LocalAudioTrackOptions @@ -56,7 +59,6 @@ import io.livekit.android.room.track.screencapture.ScreenCaptureParams import io.livekit.android.room.util.EncodingUtils import io.livekit.android.rpc.RpcError import io.livekit.android.util.LKLog -import io.livekit.android.util.byteLength import io.livekit.android.util.flow import io.livekit.android.util.rethrowIfCancellationSignal import io.livekit.android.webrtc.sortVideoCodecPreferences @@ -64,9 +66,7 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Job import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import livekit.LivekitModels @@ -86,14 +86,10 @@ import livekit.org.webrtc.SurfaceTextureHelper import livekit.org.webrtc.VideoCapturer import livekit.org.webrtc.VideoProcessor import java.util.Collections -import java.util.UUID import javax.inject.Named -import kotlin.coroutines.resume import kotlin.math.max import kotlin.math.min import kotlin.time.Duration -import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.Duration.Companion.seconds class LocalParticipant @AssistedInject @@ -113,6 +109,8 @@ internal constructor( @Named(InjectionNames.SENDER) private val capabilitiesGetter: CapabilitiesGetter, private val outgoingDataStreamManager: OutgoingDataStreamManager, + private val rpcClientManager: RpcClientManager, + private val rpcServerManager: RpcServerManager, ) : Participant(Sid(""), null, coroutineDispatcher), OutgoingDataStreamManager by outgoingDataStreamManager, RpcManager { @@ -132,10 +130,6 @@ internal constructor( private val jobs = mutableMapOf() - private val rpcHandlers = Collections.synchronizedMap(mutableMapOf()) // methodName to handler - private val pendingAcks = Collections.synchronizedMap(mutableMapOf()) // requestId to pending ack - private val pendingResponses = Collections.synchronizedMap(mutableMapOf()) // requestId to pending response - // For ensuring that only one caller can execute setTrackEnabled at a time. // Without it, there's a potential to create multiple of the same source, // Camera has deadlock issues with multiple CameraCapturers trying to activate/stop. @@ -146,6 +140,11 @@ internal constructor( private var defaultAudioTrack: LocalAudioTrack? = null private var defaultVideoTrack: LocalVideoTrack? = null + internal fun reinitialize(connectOptions: ConnectOptions) { + reinitialize() + clientProtocol = connectOptions.clientProtocol.value + } + /** * Returns the default audio track, or creates one if it doesn't exist. * @exception SecurityException will be thrown if [Manifest.permission.RECORD_AUDIO] permission is missing. @@ -1033,11 +1032,8 @@ internal constructor( return engine.sendData(dataPacket) } - override fun registerRpcMethod( - method: String, - handler: RpcHandler, - ) { - this.rpcHandlers[method] = handler + override fun registerRpcMethod(method: String, handler: RpcHandler) { + rpcServerManager.registerRpcMethod(method, handler) } /** @@ -1045,48 +1041,41 @@ internal constructor( * * @param method The name of the RPC method to unregister */ - override fun unregisterRpcMethod( - method: String, - ) { - this.rpcHandlers.remove(method) + override fun unregisterRpcMethod(method: String) { + rpcServerManager.unregisterRpcMethod(method) } internal fun handleDataPacket(packet: DataPacket) { when { + // v1 RPC request packet.hasRpcRequest() -> { val rpcRequest = packet.rpcRequest scope.launch { - handleIncomingRpcRequest( + rpcServerManager.handleIncomingRpcRequest( callerIdentity = Identity(packet.participantIdentity), - requestId = rpcRequest.id, - method = rpcRequest.method, - payload = rpcRequest.payload, - responseTimeout = rpcRequest.responseTimeoutMs.toUInt().toLong().milliseconds, - version = rpcRequest.version, + rpcRequest = rpcRequest, ) } } packet.hasRpcResponse() -> { val rpcResponse = packet.rpcResponse - var payload: String? = null - var error: RpcError? = null - - if (rpcResponse.hasPayload()) { - payload = rpcResponse.payload - } else if (rpcResponse.hasError()) { - error = RpcError.fromProto(rpcResponse.error) + if (rpcResponse.hasError()) { + rpcClientManager.handleIncomingRpcResponseFailure( + rpcResponse.requestId, + RpcError.fromProto(rpcResponse.error), + ) + } else { + // v1 RPC response + rpcClientManager.handleIncomingRpcResponseSuccess( + rpcResponse.requestId, + if (rpcResponse.hasPayload()) rpcResponse.payload else "", + ) } - handleIncomingRpcResponse( - requestId = rpcResponse.requestId, - payload = payload, - error = error, - ) } packet.hasRpcAck() -> { - val rpcAck = packet.rpcAck - handleIncomingRpcAck(rpcAck.requestId) + rpcClientManager.handleIncomingRpcAck(packet.rpcAck.requestId) } } } @@ -1097,311 +1086,10 @@ internal constructor( payload: String, responseTimeout: Duration, maxRoundTripLatency: Duration, - ): String = coroutineScope { - // Minimum allowed effective timeout to ensure the RPC lifecycle always has at least - // one second to complete, even after accounting for round-trip latency. - val minEffectiveTimeout = 1.seconds - - if (payload.byteLength() > RpcError.MAX_V1_PAYLOAD_BYTES) { - throw RpcError.BuiltinRpcError.REQUEST_PAYLOAD_TOO_LARGE.create() - } - - val serverVersion = engine.serverVersion - ?: throw RpcError.BuiltinRpcError.SEND_FAILED.create(data = "Not connected.") - - if (serverVersion < Semver("1.8.0")) { - throw RpcError.BuiltinRpcError.UNSUPPORTED_SERVER.create() - } - - val requestId = UUID.randomUUID().toString() - // Ensure the effective response timeout is not less than 1 second - val effectiveTimeout = (responseTimeout - maxRoundTripLatency).coerceAtLeast(minEffectiveTimeout) - val result = publishRpcRequest( - destinationIdentity = destinationIdentity, - requestId = requestId, - method = method, - payload = payload, - responseTimeout = effectiveTimeout, - ) - - if (result.isFailure) { - val exception = result.exceptionOrNull() as? RpcError - ?: RpcError.BuiltinRpcError.SEND_FAILED.create(data = "Error while sending rpc request.", cause = result.exceptionOrNull()) - throw exception - } - - val responsePayload = suspendCancellableCoroutine { continuation -> - var ackTimeoutJob: Job? = null - var responseTimeoutJob: Job? = null - - fun cleanup() { - ackTimeoutJob?.cancel() - responseTimeoutJob?.cancel() - pendingAcks.remove(requestId) - pendingResponses.remove(requestId) - } - - continuation.invokeOnCancellation { cleanup() } - - ackTimeoutJob = launch { - delay(maxRoundTripLatency) - val receivedAck = pendingAcks.remove(requestId) == null - if (!receivedAck) { - pendingResponses.remove(requestId) - continuation.cancel(RpcError.BuiltinRpcError.CONNECTION_TIMEOUT.create()) - } - } - pendingAcks[requestId] = PendingRpcAck( - participantIdentity = destinationIdentity, - onResolve = { ackTimeoutJob.cancel() }, - ) - - responseTimeoutJob = launch { - delay(responseTimeout) - val receivedResponse = pendingResponses.remove(requestId) == null - if (!receivedResponse) { - continuation.cancel(RpcError.BuiltinRpcError.RESPONSE_TIMEOUT.create()) - } - } - responseTimeoutJob // workaround for lint marking this unused. used in cleanup() - - pendingResponses[requestId] = PendingRpcResponse( - participantIdentity = destinationIdentity, - onResolve = { payload, error -> - if (pendingAcks.containsKey(requestId)) { - LKLog.i { "RPC response received before ack, id: $requestId" } - } - cleanup() - - if (error != null) { - continuation.cancel(error) - } else { - continuation.resume(payload ?: "") - } - }, - ) - } - return@coroutineScope responsePayload - } - - @CheckResult - private suspend fun rpcSendData(dataPacket: DataPacket): Result { - val result = engine.sendData(dataPacket) - - return if (result.isFailure) { - Result.failure(RpcError.BuiltinRpcError.SEND_FAILED.create(cause = result.exceptionOrNull())) - } else { - result - } - } - - @CheckResult - private suspend fun publishRpcRequest( - destinationIdentity: Identity, - requestId: String, - method: String, - payload: String, - responseTimeout: Duration = 10.seconds, - ): Result { - if (payload.byteLength() > RpcError.MAX_V1_PAYLOAD_BYTES) { - return Result.failure(RpcError.BuiltinRpcError.REQUEST_PAYLOAD_TOO_LARGE.create()) - } - - val dataPacket = with(DataPacket.newBuilder()) { - addDestinationIdentities(destinationIdentity.value) - kind = DataPacket.Kind.RELIABLE - rpcRequest = with(LivekitModels.RpcRequest.newBuilder()) { - this.id = requestId - this.method = method - this.payload = payload - this.responseTimeoutMs = responseTimeout.inWholeMilliseconds.toUInt().toInt() - this.version = RpcManager.RPC_VERSION - build() - } - build() - } - - return rpcSendData(dataPacket) - } - - @CheckResult - private suspend fun publishRpcResponse( - destinationIdentity: Identity, - requestId: String, - payload: String?, - error: RpcError?, - ): Result { - if (payload.byteLength() > RpcError.MAX_V1_PAYLOAD_BYTES) { - return Result.failure(RpcError.BuiltinRpcError.RESPONSE_PAYLOAD_TOO_LARGE.create()) - } - - val dataPacket = with(DataPacket.newBuilder()) { - addDestinationIdentities(destinationIdentity.value) - kind = DataPacket.Kind.RELIABLE - rpcResponse = with(LivekitModels.RpcResponse.newBuilder()) { - this.requestId = requestId - if (error != null) { - this.error = error.toProto() - } else { - this.payload = payload ?: "" - } - build() - } - build() - } - - return rpcSendData(dataPacket) - } - - @CheckResult - private suspend fun publishRpcAck( - destinationIdentity: Identity, - requestId: String, - ): Result { - val dataPacket = with(DataPacket.newBuilder()) { - addDestinationIdentities(destinationIdentity.value) - kind = DataPacket.Kind.RELIABLE - rpcAck = with(LivekitModels.RpcAck.newBuilder()) { - this.requestId = requestId - build() - } - build() - } - - return rpcSendData(dataPacket) - } - - private fun handleIncomingRpcAck(requestId: String) { - val handler = this.pendingAcks.remove(requestId) - if (handler != null) { - handler.onResolve() - } else { - LKLog.e { "Ack received for unexpected RPC request, id = $requestId" } - } - } - - private fun handleIncomingRpcResponse( - requestId: String, - payload: String?, - error: RpcError?, - ) { - val handler = this.pendingResponses.remove(requestId) - if (handler != null) { - handler.onResolve(payload, error) - } else { - LKLog.e { "Response received for unexpected RPC request, id = $requestId" } - } - } - - private suspend fun handleIncomingRpcRequest( - callerIdentity: Identity, - requestId: String, - method: String, - payload: String, - responseTimeout: Duration, - version: Int, - ) { - publishRpcAck(callerIdentity, requestId).also { result -> - if (result.isFailure) { - LKLog.w(result.exceptionOrNull()) { "Error sending ack for request $requestId." } - return - } - } - - if (version != RpcManager.RPC_VERSION) { - publishRpcResponse( - destinationIdentity = callerIdentity, - requestId = requestId, - payload = null, - error = RpcError.BuiltinRpcError.UNSUPPORTED_VERSION.create(), - ).also { result -> - if (result.isFailure) { - LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." } - } - } - - return - } - - val handler = this.rpcHandlers[method] - - if (handler == null) { - publishRpcResponse( - destinationIdentity = callerIdentity, - requestId = requestId, - payload = null, - error = RpcError.BuiltinRpcError.UNSUPPORTED_METHOD.create(), - ).also { result -> - if (result.isFailure) { - LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." } - } - } - - return - } - - var responseError: RpcError? = null - var responsePayload: String? = null - - try { - val response = handler.invoke( - RpcInvocationData( - requestId = requestId, - callerIdentity = callerIdentity, - payload = payload, - responseTimeout = responseTimeout, - ), - ) - - if (response.byteLength() > RpcError.MAX_V1_PAYLOAD_BYTES) { - responseError = RpcError.BuiltinRpcError.RESPONSE_PAYLOAD_TOO_LARGE.create() - LKLog.w { "RPC Response payload too large for $method" } - } else { - responsePayload = response - } - } catch (e: Exception) { - e.rethrowIfCancellationSignal() - if (e is RpcError) { - responseError = e - } else { - LKLog.w(e) { "Uncaught error returned by RPC handler for $method. Returning APPLICATION_ERROR instead." } - responseError = RpcError.BuiltinRpcError.APPLICATION_ERROR.create() - } - } - - publishRpcResponse( - destinationIdentity = callerIdentity, - requestId = requestId, - payload = responsePayload, - error = responseError, - ).also { result -> - if (result.isFailure) { - LKLog.w(result.exceptionOrNull()) { "Error sending error response for request $requestId." } - } - } - } + ): String = rpcClientManager.performRpc(destinationIdentity, method, payload, responseTimeout, maxRoundTripLatency) internal fun handleParticipantDisconnect(identity: Identity) { - synchronized(pendingAcks) { - val acksIterator = pendingAcks.iterator() - while (acksIterator.hasNext()) { - val (_, ack) = acksIterator.next() - if (ack.participantIdentity == identity) { - acksIterator.remove() - } - } - } - - synchronized(pendingResponses) { - val responsesIterator = pendingResponses.iterator() - while (responsesIterator.hasNext()) { - val (_, response) = responsesIterator.next() - if (response.participantIdentity == identity) { - responsesIterator.remove() - response.onResolve(null, RpcError.BuiltinRpcError.RECIPIENT_DISCONNECTED.create()) - } - } - } + rpcClientManager.handleParticipantDisconnect(identity) } /** @@ -1693,6 +1381,7 @@ internal constructor( override fun dispose() { cleanup() enabledPublishVideoCodecs.clear() + clientProtocol = ClientProtocolVersion.DEFAULT.value super.dispose() } @@ -2001,13 +1690,3 @@ data class RpcInvocationData( */ val responseTimeout: Duration, ) - -private data class PendingRpcAck( - val onResolve: () -> Unit, - val participantIdentity: Participant.Identity, -) - -private data class PendingRpcResponse( - val onResolve: (payload: String?, error: RpcError?) -> Unit, - val participantIdentity: Participant.Identity, -) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt index 09443c067..c3a6298b9 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt @@ -22,6 +22,7 @@ import io.livekit.android.events.BroadcastEventBus import io.livekit.android.events.ParticipantEvent import io.livekit.android.events.RoomEvent import io.livekit.android.events.TrackEvent +import io.livekit.android.room.ClientProtocolVersion import io.livekit.android.room.track.LocalTrackPublication import io.livekit.android.room.track.RemoteTrackPublication import io.livekit.android.room.track.Track @@ -267,6 +268,15 @@ open class Participant( var kind by flowDelegate(Kind.UNKNOWN) internal set + /** + * The protocol version this participant's client advertises for peer-to-peer features + * (RPC v2, etc.). See [ClientProtocolVersion] for known values. + */ + @FlowObservable + @get:FlowObservable + var clientProtocol: Int by flowDelegate(ClientProtocolVersion.DEFAULT.value) + internal set + /** * @suppress */ @@ -438,6 +448,7 @@ open class Participant( attributes = info.attributesMap agentAttributes = AgentAttributes.fromStringMap(info.attributesMap) state = State.fromProto(info.state) + clientProtocol = info.clientProtocol } override fun equals(other: Any?): Boolean { @@ -514,6 +525,8 @@ open class Participant( INGRESS, EGRESS, SIP, + CONNECTOR, + BRIDGE, UNKNOWN, ; @@ -528,7 +541,10 @@ open class Participant( LivekitModels.ParticipantInfo.Kind.INGRESS -> INGRESS LivekitModels.ParticipantInfo.Kind.EGRESS -> EGRESS LivekitModels.ParticipantInfo.Kind.SIP -> SIP - LivekitModels.ParticipantInfo.Kind.UNRECOGNIZED -> UNKNOWN + LivekitModels.ParticipantInfo.Kind.CONNECTOR -> CONNECTOR + LivekitModels.ParticipantInfo.Kind.BRIDGE -> BRIDGE + LivekitModels.ParticipantInfo.Kind.UNRECOGNIZED, + -> UNKNOWN } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcClientManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcClientManager.kt new file mode 100644 index 000000000..db1e3b0d5 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcClientManager.kt @@ -0,0 +1,334 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.rpc + +import androidx.annotation.CheckResult +import com.vdurmont.semver4j.Semver +import io.livekit.android.room.ClientProtocolVersion +import io.livekit.android.room.RTCEngine +import io.livekit.android.room.datastream.StreamTextOptions +import io.livekit.android.room.datastream.incoming.TextStreamReceiver +import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager +import io.livekit.android.room.datastream.outgoing.useStreamSender +import io.livekit.android.room.participant.Participant.Identity +import io.livekit.android.rpc.RpcError +import io.livekit.android.util.LKLog +import io.livekit.android.util.byteLength +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import livekit.LivekitModels +import livekit.LivekitModels.DataPacket +import java.util.Collections +import java.util.UUID +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +/** + * Manages the **caller** (client) side of RPC: + * - issuing `performRpc` calls, + * - tracking pending acks and responses by request id, + * - routing incoming acks / responses (whether via v1 packet or v2 data stream) to the + * awaiting coroutines. + * + * Lives on `Room` (one instance per room session, injected by Dagger). + * + * @suppress + */ +@Singleton +class RpcClientManager @Inject constructor( + private val engine: RTCEngine, + private val outgoingDataStreamManager: OutgoingDataStreamManager, +) { + + /** + * Late-bound: returns the advertised `clientProtocol` of the remote participant with the + * given identity, or [ClientProtocolVersion.DEFAULT] if unknown. Set by `Room` once the + * participant store is available. + */ + var getRemoteClientProtocol: (Identity) -> Int = { ClientProtocolVersion.DEFAULT.value } + + private val pendingAcks = Collections.synchronizedMap(mutableMapOf()) + private val pendingResponses = Collections.synchronizedMap(mutableMapOf()) + + /** + * @throws RpcError + */ + suspend fun performRpc( + destinationIdentity: Identity, + method: String, + payload: String, + responseTimeout: Duration, + maxRoundTripLatency: Duration, + ): String = coroutineScope { + val minEffectiveTimeout = 1.seconds + + val remoteClientProtocol = getRemoteClientProtocol(destinationIdentity) + + // The 15 KB packet limit only applies to v1; v2 streams chunk transparently. + if (remoteClientProtocol < ClientProtocolVersion.DATA_STREAM_RPC.value && + payload.byteLength() > MAX_V1_PAYLOAD_BYTES + ) { + throw RpcError.BuiltinRpcError.REQUEST_PAYLOAD_TOO_LARGE.create() + } + + val serverVersion = engine.serverVersion + ?: throw RpcError.BuiltinRpcError.SEND_FAILED.create(data = "Not connected.") + if (serverVersion < Semver("1.8.0")) { + throw RpcError.BuiltinRpcError.UNSUPPORTED_SERVER.create() + } + + val requestId = UUID.randomUUID().toString() + val effectiveTimeout = (responseTimeout - maxRoundTripLatency).coerceAtLeast(minEffectiveTimeout) + + val responseDeferred = CompletableDeferred() + + val ackTimeoutJob = launch { + delay(maxRoundTripLatency) + if (pendingAcks.remove(requestId) != null) { + pendingResponses.remove(requestId) + responseDeferred.completeExceptionally( + RpcError.BuiltinRpcError.CONNECTION_TIMEOUT.create(), + ) + } + } + + val responseTimeoutJob = launch { + delay(responseTimeout) + if (pendingResponses.remove(requestId) != null) { + responseDeferred.completeExceptionally( + RpcError.BuiltinRpcError.RESPONSE_TIMEOUT.create(), + ) + } + } + + // Register pending state BEFORE the suspending publish so a response that arrives + // during the publish path is routed correctly (spec #12). + pendingAcks[requestId] = PendingRpcAck(destinationIdentity) { + ackTimeoutJob.cancel() + } + pendingResponses[requestId] = PendingRpcResponse(destinationIdentity) { responsePayload, error -> + if (pendingAcks.remove(requestId) != null) { + LKLog.i { "RPC response received before ack, id: $requestId" } + ackTimeoutJob.cancel() + } + responseTimeoutJob.cancel() + if (error != null) { + responseDeferred.completeExceptionally(error) + } else { + responseDeferred.complete(responsePayload ?: "") + } + } + + val publishResult = if (remoteClientProtocol >= ClientProtocolVersion.DATA_STREAM_RPC.value) { + publishRpcRequestV2(destinationIdentity, requestId, method, payload, effectiveTimeout) + } else { + publishRpcRequestV1(destinationIdentity, requestId, method, payload, effectiveTimeout) + } + + if (publishResult.isFailure) { + pendingAcks.remove(requestId) + pendingResponses.remove(requestId) + ackTimeoutJob.cancel() + responseTimeoutJob.cancel() + val cause = publishResult.exceptionOrNull() + val exception = cause as? RpcError + ?: RpcError.BuiltinRpcError.SEND_FAILED.create( + data = "Error while sending rpc request.", + cause = cause, + ) + throw exception + } + + try { + responseDeferred.await() + } finally { + pendingAcks.remove(requestId) + pendingResponses.remove(requestId) + ackTimeoutJob.cancel() + responseTimeoutJob.cancel() + } + } + + @CheckResult + private suspend fun publishRpcRequestV1( + destinationIdentity: Identity, + requestId: String, + method: String, + payload: String, + responseTimeout: Duration, + ): Result { + val dataPacket = with(DataPacket.newBuilder()) { + addDestinationIdentities(destinationIdentity.value) + kind = DataPacket.Kind.RELIABLE + rpcRequest = with(LivekitModels.RpcRequest.newBuilder()) { + this.id = requestId + this.method = method + this.payload = payload + this.responseTimeoutMs = responseTimeout.inWholeMilliseconds.toUInt().toInt() + this.version = RPC_VERSION_V1 + build() + } + build() + } + return rpcSendData(dataPacket) + } + + @CheckResult + private suspend fun publishRpcRequestV2( + destinationIdentity: Identity, + requestId: String, + method: String, + payload: String, + responseTimeout: Duration, + ): Result { + val sender = try { + outgoingDataStreamManager.streamText( + StreamTextOptions( + topic = RPC_REQUEST_DATA_STREAM_TOPIC, + destinationIdentities = listOf(destinationIdentity), + attributes = mapOf( + RpcRequestAttrs.REQUEST_ID to requestId, + RpcRequestAttrs.METHOD to method, + RpcRequestAttrs.RESPONSE_TIMEOUT_MS to responseTimeout.inWholeMilliseconds.toString(), + RpcRequestAttrs.VERSION to RPC_VERSION_V2.toString(), + ), + ), + ) + } catch (e: Throwable) { + return Result.failure(e) + } + return useStreamSender(sender) { + write(payload).getOrThrow() + close() + } + } + + @CheckResult + private suspend fun rpcSendData(dataPacket: DataPacket): Result { + val result = engine.sendData(dataPacket) + return if (result.isFailure) { + Result.failure( + RpcError.BuiltinRpcError.SEND_FAILED.create(cause = result.exceptionOrNull()), + ) + } else { + result + } + } + + fun handleIncomingRpcAck(requestId: String) { + val handler = pendingAcks.remove(requestId) + if (handler != null) { + handler.onResolve() + } else { + LKLog.e { "Ack received for unexpected RPC request, id = $requestId" } + } + } + + fun handleIncomingRpcResponseSuccess(requestId: String, payload: String) { + val handler = pendingResponses.remove(requestId) + if (handler != null) { + handler.onResolve(payload, null) + } else { + LKLog.e { "Response received for unexpected RPC request, id = $requestId" } + } + } + + fun handleIncomingRpcResponseFailure(requestId: String, error: RpcError) { + val handler = pendingResponses.remove(requestId) + if (handler != null) { + handler.onResolve(null, error) + } else { + LKLog.e { "Error response received for unexpected RPC request, id = $requestId" } + } + } + + /** + * Handle an incoming v2 RPC response data stream on topic `lk.rpc_response`. + */ + suspend fun handleIncomingDataStreamResponse(receiver: TextStreamReceiver, fromIdentity: Identity) { + val requestId = receiver.info.attributes[RpcRequestAttrs.REQUEST_ID] + if (requestId.isNullOrEmpty()) { + LKLog.w { "RPC response stream malformed: ${RpcRequestAttrs.REQUEST_ID} not set." } + return + } + + // Validate sender identity matches the expected destination of the pending request. + // (Spec #14: a response data stream from an unexpected sender MUST NOT resolve the + // pending request.) + val pending = pendingResponses[requestId] + if (pending != null && pending.participantIdentity != fromIdentity) { + LKLog.w { + "RPC response stream for $requestId arrived from unexpected sender " + + "${fromIdentity.value}, expected ${pending.participantIdentity.value}. Ignoring." + } + return + } + + val payload = try { + receiver.readAll().joinToString(separator = "") + } catch (e: Throwable) { + LKLog.w(e) { "Error reading RPC response payload for $requestId" } + handleIncomingRpcResponseFailure( + requestId, + RpcError.BuiltinRpcError.APPLICATION_ERROR.create( + data = "Error reading RPC response payload", + cause = e, + ), + ) + return + } + + handleIncomingRpcResponseSuccess(requestId, payload) + } + + fun handleParticipantDisconnect(identity: Identity) { + synchronized(pendingAcks) { + val acksIterator = pendingAcks.iterator() + while (acksIterator.hasNext()) { + val (_, ack) = acksIterator.next() + if (ack.participantIdentity == identity) { + acksIterator.remove() + } + } + } + + synchronized(pendingResponses) { + val responsesIterator = pendingResponses.iterator() + while (responsesIterator.hasNext()) { + val (_, response) = responsesIterator.next() + if (response.participantIdentity == identity) { + responsesIterator.remove() + response.onResolve(null, RpcError.BuiltinRpcError.RECIPIENT_DISCONNECTED.create()) + } + } + } + } + + private data class PendingRpcAck( + val participantIdentity: Identity, + val onResolve: () -> Unit, + ) + + private data class PendingRpcResponse( + val participantIdentity: Identity, + val onResolve: (payload: String?, error: RpcError?) -> Unit, + ) +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcConstants.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcConstants.kt new file mode 100644 index 000000000..d2017fcbb --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcConstants.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.rpc + +/** The maximum payload size for v1 RPC requests and responses in bytes. */ +internal const val MAX_V1_PAYLOAD_BYTES = 15 * 1024 // 15KB + +/** Version of RPC backed by inline `RpcRequest` / `RpcResponse` packets. */ +internal const val RPC_VERSION_V1 = 1 + +/** Version of RPC backed by data streams for request and success-response payloads. */ +internal const val RPC_VERSION_V2 = 2 + +internal const val RPC_REQUEST_DATA_STREAM_TOPIC = "lk.rpc_request" +internal const val RPC_RESPONSE_DATA_STREAM_TOPIC = "lk.rpc_response" + +/** Attribute keys attached to v2 RPC request data streams. */ +internal object RpcRequestAttrs { + const val REQUEST_ID = "lk.rpc_request_id" + const val METHOD = "lk.rpc_request_method" + const val RESPONSE_TIMEOUT_MS = "lk.rpc_request_response_timeout_ms" + const val VERSION = "lk.rpc_request_version" +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcServerManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcServerManager.kt new file mode 100644 index 000000000..70cd473f1 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/rpc/RpcServerManager.kt @@ -0,0 +1,345 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.rpc + +import androidx.annotation.CheckResult +import io.livekit.android.room.ClientProtocolVersion +import io.livekit.android.room.RTCEngine +import io.livekit.android.room.datastream.StreamTextOptions +import io.livekit.android.room.datastream.incoming.TextStreamReceiver +import io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager +import io.livekit.android.room.datastream.outgoing.useStreamSender +import io.livekit.android.room.participant.Participant.Identity +import io.livekit.android.room.participant.RpcHandler +import io.livekit.android.room.participant.RpcInvocationData +import io.livekit.android.rpc.RpcError +import io.livekit.android.util.LKLog +import io.livekit.android.util.byteLength +import io.livekit.android.util.rethrowIfCancellationSignal +import livekit.LivekitModels +import livekit.LivekitModels.DataPacket +import java.util.Collections +import javax.inject.Inject +import javax.inject.Singleton +import kotlin.time.Duration.Companion.milliseconds + +/** + * Manages the **handler** (server) side of RPC: + * - the registered method handlers, + * - dispatching v1 (`RpcRequest` packet) and v2 (data-stream) incoming requests through to + * the handlers, and + * - sending the response back as either a v1 packet (errors, or v1 callers) or a v2 data + * stream (success responses to v2 callers). + * + * Lives on `Room` (one instance per room session, injected by Dagger). + * + * @suppress + */ +@Singleton +class RpcServerManager @Inject constructor( + private val engine: RTCEngine, + private val outgoingDataStreamManager: OutgoingDataStreamManager, +) { + + /** + * Late-bound: returns the advertised `clientProtocol` of the remote participant with the + * given identity, or [ClientProtocolVersion.DEFAULT] if unknown. Set by `Room` once the + * participant store is available. + */ + var getRemoteClientProtocol: (Identity) -> Int = { ClientProtocolVersion.DEFAULT.value } + + private val rpcHandlers = Collections.synchronizedMap(mutableMapOf()) + + fun registerRpcMethod(method: String, handler: RpcHandler) { + rpcHandlers[method] = handler + } + + fun unregisterRpcMethod(method: String) { + rpcHandlers.remove(method) + } + + /** + * Handle an incoming v1 `RpcRequest` packet. + */ + suspend fun handleIncomingRpcRequest( + callerIdentity: Identity, + rpcRequest: LivekitModels.RpcRequest, + ) { + val requestId = rpcRequest.id + val responseTimeout = rpcRequest.responseTimeoutMs.toUInt().toLong().milliseconds + + publishRpcAck(callerIdentity, requestId).also { result -> + if (result.isFailure) { + LKLog.w(result.exceptionOrNull()) { "Error sending ack for request $requestId." } + return + } + } + + if (rpcRequest.version != RPC_VERSION_V1) { + sendErrorResponse(callerIdentity, requestId, RpcError.BuiltinRpcError.UNSUPPORTED_VERSION.create()) + return + } + + runHandlerAndSendResponse( + callerIdentity = callerIdentity, + requestId = requestId, + method = rpcRequest.method, + payload = rpcRequest.payload, + responseTimeoutMs = responseTimeout.inWholeMilliseconds, + ) + } + + /** + * Handle an incoming v2 RPC request data stream on topic `lk.rpc_request`. + */ + suspend fun handleIncomingDataStream( + receiver: TextStreamReceiver, + callerIdentity: Identity, + ) { + val attrs = receiver.info.attributes + val requestId = attrs[RpcRequestAttrs.REQUEST_ID] + val method = attrs[RpcRequestAttrs.METHOD] + val responseTimeoutMs = attrs[RpcRequestAttrs.RESPONSE_TIMEOUT_MS]?.toLongOrNull() + val version = attrs[RpcRequestAttrs.VERSION]?.toIntOrNull() + + if (requestId.isNullOrEmpty()) { + LKLog.w { "RPC v2 request stream malformed: ${RpcRequestAttrs.REQUEST_ID} not set." } + return + } + + if (method.isNullOrEmpty() || responseTimeoutMs == null || version == null) { + LKLog.w { + "RPC v2 request stream malformed for $requestId: " + + "method=$method, responseTimeoutMs=$responseTimeoutMs, version=$version" + } + publishRpcAck(callerIdentity, requestId) + sendErrorResponse( + callerIdentity, + requestId, + RpcError.BuiltinRpcError.APPLICATION_ERROR.create(data = "RPC request stream malformed"), + ) + return + } + + publishRpcAck(callerIdentity, requestId).also { result -> + if (result.isFailure) { + LKLog.w(result.exceptionOrNull()) { "Error sending ack for request $requestId." } + return + } + } + + if (version != RPC_VERSION_V2) { + sendErrorResponse(callerIdentity, requestId, RpcError.BuiltinRpcError.UNSUPPORTED_VERSION.create()) + return + } + + val payload = try { + receiver.readAll().joinToString(separator = "") + } catch (e: Throwable) { + LKLog.w(e) { "Error reading RPC request payload for $requestId" } + sendErrorResponse( + callerIdentity, + requestId, + RpcError.BuiltinRpcError.APPLICATION_ERROR.create( + data = "Error reading RPC request payload", + cause = e, + ), + ) + return + } + + runHandlerAndSendResponse( + callerIdentity = callerIdentity, + requestId = requestId, + method = method, + payload = payload, + responseTimeoutMs = responseTimeoutMs, + ) + } + + private suspend fun runHandlerAndSendResponse( + callerIdentity: Identity, + requestId: String, + method: String, + payload: String, + responseTimeoutMs: Long, + ) { + val handler = rpcHandlers[method] + if (handler == null) { + sendErrorResponse(callerIdentity, requestId, RpcError.BuiltinRpcError.UNSUPPORTED_METHOD.create()) + return + } + + val response: String = try { + handler.invoke( + RpcInvocationData( + requestId = requestId, + callerIdentity = callerIdentity, + payload = payload, + responseTimeout = responseTimeoutMs.milliseconds, + ), + ) + } catch (e: Throwable) { + e.rethrowIfCancellationSignal() + val responseError = if (e is RpcError) { + e + } else { + LKLog.w(e) { + "Uncaught error returned by RPC handler for $method. Returning APPLICATION_ERROR instead." + } + RpcError.BuiltinRpcError.APPLICATION_ERROR.create() + } + sendErrorResponse(callerIdentity, requestId, responseError) + return + } + + sendSuccessResponse(callerIdentity, requestId, response) + } + + /** + * Send a successful RPC response. Chooses v2 (data stream) when the caller advertises + * [ClientProtocolVersion.DATA_STREAM_RPC] or higher, otherwise sends a v1 packet (and + * enforces the 15 KB size cap, since v1 has no chunking). + */ + private suspend fun sendSuccessResponse( + callerIdentity: Identity, + requestId: String, + payload: String, + ) { + val callerProtocol = getRemoteClientProtocol(callerIdentity) + + if (callerProtocol >= ClientProtocolVersion.DATA_STREAM_RPC.value) { + val publishResult = publishRpcResponseV2(callerIdentity, requestId, payload) + if (publishResult.isFailure) { + LKLog.w(publishResult.exceptionOrNull()) { + "Error sending v2 response stream for $requestId" + } + } + return + } + + // v1 caller: payload-size guard required (no chunking). + if (payload.byteLength() > MAX_V1_PAYLOAD_BYTES) { + LKLog.w { "RPC v1 response payload too large for request $requestId." } + sendErrorResponse( + callerIdentity, + requestId, + RpcError.BuiltinRpcError.RESPONSE_PAYLOAD_TOO_LARGE.create(), + ) + return + } + + publishRpcResponseV1(callerIdentity, requestId, payload, null).also { result -> + if (result.isFailure) { + LKLog.w(result.exceptionOrNull()) { "Error sending response for request $requestId." } + } + } + } + + /** + * Error responses always travel as v1 packets, regardless of either side's protocol. + */ + private suspend fun sendErrorResponse( + callerIdentity: Identity, + requestId: String, + error: RpcError, + ) { + publishRpcResponseV1(callerIdentity, requestId, null, error).also { result -> + if (result.isFailure) { + LKLog.w(result.exceptionOrNull()) { + "Error sending error response for request $requestId." + } + } + } + } + + @CheckResult + private suspend fun publishRpcResponseV1( + callerIdentity: Identity, + requestId: String, + payload: String?, + error: RpcError?, + ): Result { + val dataPacket = with(DataPacket.newBuilder()) { + addDestinationIdentities(callerIdentity.value) + kind = DataPacket.Kind.RELIABLE + rpcResponse = with(LivekitModels.RpcResponse.newBuilder()) { + this.requestId = requestId + if (error != null) { + this.error = error.toProto() + } else { + this.payload = payload ?: "" + } + build() + } + build() + } + return rpcSendData(dataPacket) + } + + @CheckResult + private suspend fun publishRpcResponseV2( + callerIdentity: Identity, + requestId: String, + payload: String, + ): Result { + val sender = try { + outgoingDataStreamManager.streamText( + StreamTextOptions( + topic = RPC_RESPONSE_DATA_STREAM_TOPIC, + destinationIdentities = listOf(callerIdentity), + attributes = mapOf(RpcRequestAttrs.REQUEST_ID to requestId), + ), + ) + } catch (e: Throwable) { + return Result.failure(e) + } + return useStreamSender(sender) { + write(payload).getOrThrow() + close() + } + } + + @CheckResult + private suspend fun publishRpcAck( + callerIdentity: Identity, + requestId: String, + ): Result { + val dataPacket = with(DataPacket.newBuilder()) { + addDestinationIdentities(callerIdentity.value) + kind = DataPacket.Kind.RELIABLE + rpcAck = with(LivekitModels.RpcAck.newBuilder()) { + this.requestId = requestId + build() + } + build() + } + return rpcSendData(dataPacket) + } + + @CheckResult + private suspend fun rpcSendData(dataPacket: DataPacket): Result { + val result = engine.sendData(dataPacket) + return if (result.isFailure) { + Result.failure( + RpcError.BuiltinRpcError.SEND_FAILED.create(cause = result.exceptionOrNull()), + ) + } else { + result + } + } +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/rpc/RpcError.kt b/livekit-android-sdk/src/main/java/io/livekit/android/rpc/RpcError.kt index 551a5ece3..4e58d6ded 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/rpc/RpcError.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/rpc/RpcError.kt @@ -16,7 +16,7 @@ package io.livekit.android.rpc -import io.livekit.android.rpc.RpcError.Companion.MAX_V1_PAYLOAD_BYTES +import io.livekit.android.room.rpc.MAX_V1_PAYLOAD_BYTES import io.livekit.android.util.truncateBytes import livekit.LivekitModels @@ -83,11 +83,6 @@ data class RpcError( companion object { const val MAX_MESSAGE_BYTES = 256 - /** - * The maximum payload size for v1 RPC requests and responses in bytes. - */ - const val MAX_V1_PAYLOAD_BYTES = 15 * 1024 // 15KB - fun fromProto(proto: LivekitModels.RpcError): RpcError { return RpcError( code = proto.code, diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/stats/ClientInfo.kt b/livekit-android-sdk/src/main/java/io/livekit/android/stats/ClientInfo.kt index 5981a5506..19e1b941f 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/stats/ClientInfo.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/stats/ClientInfo.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,10 +18,13 @@ package io.livekit.android.stats import android.os.Build import io.livekit.android.BuildConfig +import io.livekit.android.room.ClientProtocolVersion import io.livekit.android.room.SignalClient import livekit.LivekitModels -internal fun getClientInfo() = with(LivekitModels.ClientInfo.newBuilder()) { +internal fun getClientInfo( + clientProtocol: ClientProtocolVersion = ClientProtocolVersion.DATA_STREAM_RPC, +) = with(LivekitModels.ClientInfo.newBuilder()) { sdk = LivekitModels.ClientInfo.SDK.ANDROID version = BuildConfig.VERSION_NAME os = SignalClient.SDK_TYPE @@ -30,5 +33,6 @@ internal fun getClientInfo() = with(LivekitModels.ClientInfo.newBuilder()) { val vendor = Build.MANUFACTURER ?: "" val model = Build.MODEL ?: "" deviceModel = ("$vendor $model").trim() + this.clientProtocol = clientProtocol.value build() } diff --git a/livekit-android-test/build.gradle b/livekit-android-test/build.gradle index a83b3c969..1629569f0 100644 --- a/livekit-android-test/build.gradle +++ b/livekit-android-test/build.gradle @@ -3,6 +3,7 @@ plugins { id 'com.android.library' id 'kotlin-android' id 'kotlin-kapt' + id 'com.google.protobuf' } android { @@ -41,6 +42,16 @@ android { } } +ext.livekitProtoIncludes = [ + 'livekit_token_source.proto', + 'livekit_room.proto', + 'livekit_agent_dispatch.proto', + 'livekit_agent.proto', + 'livekit_egress.proto', +] +ext.livekitProtoImportSrc = true +apply from: rootProject.file('gradle/livekit-protobuf.gradle') + // Allow this module's Kotlin compilations to resolve `internal` APIs from :livekit-android-sdk // https://kotlinlang.org/docs/visibility-modifiers.html#modules // diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt index 61dcd6d0c..8dfa7e50d 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockDataChannel.kt @@ -27,6 +27,13 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { var sentPayloads = mutableListOf() var sendResult = true + /** + * Optional synchronous callback invoked from inside [send] for every outgoing buffer. + * Useful for tests that need to inject incoming traffic while the sending coroutine is + * mid-flight (e.g. RPC v2 "response arrives during publish" scenarios). + */ + var onSend: ((Buffer) -> Unit)? = null + /** * When true, [send] advances the buffer's position to its limit, mirroring * the real WebRTC wrapper which drains the buffer via `ByteBuffer.get(byte[])`. @@ -112,6 +119,7 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) { if (!consumeSentBuffer) { buffer.data.position(savedPos) } + onSend?.invoke(buffer) return sendResult } diff --git a/livekit-android-test/src/test/java/io/livekit/android/proto/ProtoConverterTest.kt b/livekit-android-test/src/test/java/io/livekit/android/proto/ProtoConverterTest.kt index c63a85a08..8bd9a933f 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/proto/ProtoConverterTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/proto/ProtoConverterTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2025 LiveKit, Inc. + * Copyright 2025-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,7 +66,7 @@ class ProtoConverterTest( ProtoConverterTestCase( LivekitModels.ParticipantPermission::class.java, ParticipantPermission::class.java, - whitelist = listOf("agent"), + whitelist = listOf("agent", "canManageAgentSession"), ), ProtoConverterTestCase( LivekitRtc.RegionSettings::class.java, @@ -80,7 +80,7 @@ class ProtoConverterTest( LivekitRtc.SessionDescription::class.java, SessionDescription::class.java, mapping = mapOf("sdp" to "description"), - whitelist = listOf("id"), + whitelist = listOf("id", "midToTrackId"), ), ProtoConverterTestCase( LivekitTokenSource.TokenSourceRequest::class.java, @@ -93,11 +93,12 @@ class ProtoConverterTest( ProtoConverterTestCase( LivekitRoom.RoomConfiguration::class.java, RoomConfiguration::class.java, - whitelist = listOf("egress"), + whitelist = listOf("egress", "tags"), ), ProtoConverterTestCase( LivekitAgentDispatch.RoomAgentDispatch::class.java, RoomAgentDispatch::class.java, + whitelist = listOf("restartPolicy", "deployment"), ), ) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index e60af4921..19a9d1c66 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -422,7 +422,7 @@ class RTCEngineMockE2ETest : MockE2ETest() { val pubDataChannel = getPublisherPeerConnection() .dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel - val oversizedPayload = ByteArray(65 * 1024) // See RTCEngine.MAX_DATA_PACKET_SIZE + val oversizedPayload = ByteArray(RTCEngine.MAX_DATA_PACKET_SIZE + 1) val result = room.localParticipant.publishData(oversizedPayload) assertTrue(result.isFailure) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt index a6ffc0d42..9a71cb630 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -136,6 +136,14 @@ class RoomTest { connectionWarmer = MockConnectionWarmer(), audioRecordPrewarmer = NoAudioRecordPrewarmer(), incomingDataStreamManager = IncomingDataStreamManagerImpl(), + rpcClientManager = io.livekit.android.room.rpc.RpcClientManager( + engine = rtcEngine, + outgoingDataStreamManager = Mockito.mock(io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager::class.java), + ), + rpcServerManager = io.livekit.android.room.rpc.RpcServerManager( + engine = rtcEngine, + outgoingDataStreamManager = Mockito.mock(io.livekit.android.room.datastream.outgoing.OutgoingDataStreamManager::class.java), + ), remoteParticipantFactory = TestRemoteParticipantFactory( rtcEngine = rtcEngine, ioDispatcher = coroutineRule.dispatcher, diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/rpc/RpcV2MockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/rpc/RpcV2MockE2ETest.kt new file mode 100644 index 000000000..7df4841e7 --- /dev/null +++ b/livekit-android-test/src/test/java/io/livekit/android/room/rpc/RpcV2MockE2ETest.kt @@ -0,0 +1,942 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.rpc + +import com.google.protobuf.ByteString +import io.livekit.android.room.ClientProtocolVersion +import io.livekit.android.room.RTCEngine +import io.livekit.android.room.participant.Participant +import io.livekit.android.rpc.RpcError +import io.livekit.android.test.MockE2ETest +import io.livekit.android.test.mock.MockDataChannel +import io.livekit.android.test.mock.MockPeerConnection +import io.livekit.android.test.mock.TestData +import io.livekit.android.test.mock.TestData.REMOTE_PARTICIPANT +import io.livekit.android.test.util.toDataChannelBuffer +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import livekit.LivekitModels +import livekit.LivekitModels.DataPacket +import livekit.LivekitModels.DataStream +import livekit.LivekitRtc +import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertNull +import org.junit.Assert.assertTrue +import org.junit.Test +import org.junit.runner.RunWith +import org.robolectric.RobolectricTestRunner + +@ExperimentalCoroutinesApi +@RunWith(RobolectricTestRunner::class) +class RpcV2MockE2ETest : MockE2ETest() { + + private lateinit var pubDataChannel: MockDataChannel + private lateinit var subDataChannel: MockDataChannel + + override suspend fun connect(joinResponse: LivekitRtc.SignalResponse) { + super.connect(joinResponse) + + val pubPeerConnection = component.rtcEngine().getPublisherPeerConnection() as MockPeerConnection + pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel + + val subPeerConnection = component.rtcEngine().getSubscriberPeerConnection() as MockPeerConnection + subDataChannel = MockDataChannel(RTCEngine.RELIABLE_DATA_CHANNEL_LABEL) + subPeerConnection.observer?.onDataChannel(subDataChannel) + } + + /** Add the test's remote participant as a v2 (data-stream-capable) client. */ + private fun simulateRemoteJoinAsV2() { + val participantUpdate = with(LivekitRtc.SignalResponse.newBuilder()) { + update = with(LivekitRtc.ParticipantUpdate.newBuilder()) { + val v2Remote = REMOTE_PARTICIPANT.toBuilder() + .setClientProtocol(ClientProtocolVersion.DATA_STREAM_RPC.value) + .build() + addParticipants(v2Remote) + build() + } + build() + } + simulateMessageFromServer(participantUpdate) + } + + /** Add the test's remote participant as a v1 (legacy) client. */ + private fun simulateRemoteJoinAsV1() { + simulateMessageFromServer(TestData.PARTICIPANT_JOIN) + } + + private fun parsePacket(buffer: livekit.org.webrtc.DataChannel.Buffer): DataPacket = + DataPacket.parseFrom(ByteString.copyFrom(buffer.data.duplicate())) + + /** + * Find the v2 RPC request stream in the buffers sent by the local participant. + * Returns the request attributes and the assembled UTF-8 payload, or null if no such + * stream is present. + */ + private fun collectOutgoingV2Stream(topic: String): Pair, String>? { + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val header = packets.firstOrNull { it.hasStreamHeader() && it.streamHeader.topic == topic } + ?: return null + val streamId = header.streamHeader.streamId + val chunks = packets + .filter { it.hasStreamChunk() && it.streamChunk.streamId == streamId } + .sortedBy { it.streamChunk.chunkIndex } + val payload = buildString { + for (chunk in chunks) { + append(chunk.streamChunk.content.toString(Charsets.UTF_8)) + } + } + return header.streamHeader.attributesMap to payload + } + + /** + * Simulate an inbound v2 RPC request stream landing on the subscriber data channel. + */ + private fun simulateIncomingRequestStream( + requestId: String, + method: String, + payload: String, + version: Int = RPC_VERSION_V2, + streamId: String = "stream-$requestId", + responseTimeoutMs: Long = 10_000, + fromIdentity: String = REMOTE_PARTICIPANT.identity, + ) { + simulateIncomingTextStream( + streamId = streamId, + topic = RPC_REQUEST_DATA_STREAM_TOPIC, + payload = payload, + attributes = mapOf( + RpcRequestAttrs.REQUEST_ID to requestId, + RpcRequestAttrs.METHOD to method, + RpcRequestAttrs.RESPONSE_TIMEOUT_MS to responseTimeoutMs.toString(), + RpcRequestAttrs.VERSION to version.toString(), + ), + fromIdentity = fromIdentity, + ) + } + + /** + * Simulate an inbound v2 RPC response stream landing on the subscriber data channel. + */ + private fun simulateIncomingResponseStream( + requestId: String, + payload: String, + streamId: String = "stream-resp-$requestId", + fromIdentity: String = REMOTE_PARTICIPANT.identity, + ) { + simulateIncomingTextStream( + streamId = streamId, + topic = RPC_RESPONSE_DATA_STREAM_TOPIC, + payload = payload, + attributes = mapOf(RpcRequestAttrs.REQUEST_ID to requestId), + fromIdentity = fromIdentity, + ) + } + + private fun simulateIncomingTextStream( + streamId: String, + topic: String, + payload: String, + attributes: Map, + fromIdentity: String, + ) { + val headerPacket = with(DataPacket.newBuilder()) { + participantIdentity = fromIdentity + streamHeader = with(DataStream.Header.newBuilder()) { + this.streamId = streamId + this.topic = topic + this.timestamp = 0L + putAllAttributes(attributes) + textHeader = with(DataStream.TextHeader.newBuilder()) { + operationType = DataStream.OperationType.CREATE + generated = false + build() + } + build() + } + build() + } + val chunkPacket = with(DataPacket.newBuilder()) { + participantIdentity = fromIdentity + streamChunk = with(DataStream.Chunk.newBuilder()) { + this.streamId = streamId + this.chunkIndex = 0L + this.content = ByteString.copyFromUtf8(payload) + build() + } + build() + } + val trailerPacket = with(DataPacket.newBuilder()) { + participantIdentity = fromIdentity + streamTrailer = with(DataStream.Trailer.newBuilder()) { + this.streamId = streamId + build() + } + build() + } + subDataChannel.simulateBufferReceived(headerPacket.toDataChannelBuffer()) + subDataChannel.simulateBufferReceived(chunkPacket.toDataChannelBuffer()) + subDataChannel.simulateBufferReceived(trailerPacket.toDataChannelBuffer()) + } + + private fun createAck(requestId: String) = with(DataPacket.newBuilder()) { + participantIdentity = REMOTE_PARTICIPANT.identity + rpcAck = with(LivekitModels.RpcAck.newBuilder()) { + this.requestId = requestId + build() + } + build() + }.toDataChannelBuffer() + + private fun createV1Response(requestId: String, payload: String? = null, error: RpcError? = null) = + with(DataPacket.newBuilder()) { + participantIdentity = REMOTE_PARTICIPANT.identity + rpcResponse = with(LivekitModels.RpcResponse.newBuilder()) { + this.requestId = requestId + if (error != null) { + this.error = error.toProto() + } else if (payload != null) this.payload = payload + build() + } + build() + }.toDataChannelBuffer() + + // ---------------------------- v2 → v2 --------------------------------------------------- + + @Test + fun caller_short_payload() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "hello", + payload = "hi", + ) + } + + coroutineRule.dispatcher.scheduler.runCurrent() + + // No RpcRequest packet should be produced. + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + assertFalse(packets.any { it.hasRpcRequest() }) + + // A v2 request stream should be present with the right attributes and payload. + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC) + assertNotNull(outgoing) + val (attrs, payload) = outgoing!! + assertEquals("hello", attrs[RpcRequestAttrs.METHOD]) + assertEquals("2", attrs[RpcRequestAttrs.VERSION]) + assertTrue(!attrs[RpcRequestAttrs.REQUEST_ID].isNullOrEmpty()) + assertEquals("hi", payload) + + val requestId = attrs[RpcRequestAttrs.REQUEST_ID]!! + subDataChannel.simulateBufferReceived(createAck(requestId)) + simulateIncomingResponseStream(requestId, "bye") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals("bye", rpcJob.await()) + } + + @Test + fun caller_large_payload_20k_no_error() = runTest { + connect() + simulateRemoteJoinAsV2() + + val largePayload = "X".repeat(20_000) + val rpcJob = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "echo", + payload = largePayload, + ) + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC) + assertNotNull("expected a v2 request stream for a 20k payload", outgoing) + val (attrs, payload) = outgoing!! + assertEquals(largePayload, payload) + val requestId = attrs[RpcRequestAttrs.REQUEST_ID]!! + + subDataChannel.simulateBufferReceived(createAck(requestId)) + simulateIncomingResponseStream(requestId, largePayload) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals(largePayload, rpcJob.await()) + } + + @Test + fun handler_short_payload() = runTest { + connect() + simulateRemoteJoinAsV2() + + room.localParticipant.registerRpcMethod("hello") { "pong" } + simulateIncomingRequestStream("req-1", "hello", "ping") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + + // Ack travels as a v1 packet. + val ackPacket = packets.firstOrNull { it.hasRpcAck() } + assertNotNull(ackPacket) + assertEquals("req-1", ackPacket!!.rpcAck.requestId) + + // No v1 RpcResponse packet — success goes via stream. + assertFalse(packets.any { it.hasRpcResponse() }) + + // Response stream is published. + val outgoing = collectOutgoingV2Stream(RPC_RESPONSE_DATA_STREAM_TOPIC) + assertNotNull(outgoing) + val (attrs, payload) = outgoing!! + assertEquals("req-1", attrs[RpcRequestAttrs.REQUEST_ID]) + assertEquals("pong", payload) + } + + @Test + fun handler_large_payload_20k_no_error() = runTest { + connect() + simulateRemoteJoinAsV2() + + val largeResponse = "X".repeat(20_000) + room.localParticipant.registerRpcMethod("echo") { largeResponse } + simulateIncomingRequestStream("req-large", "echo", "ping") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val outgoing = collectOutgoingV2Stream(RPC_RESPONSE_DATA_STREAM_TOPIC) + assertNotNull("expected a v2 response stream for a 20k response", outgoing) + val (_, payload) = outgoing!! + assertEquals(largeResponse, payload) + + // No RESPONSE_PAYLOAD_TOO_LARGE — the v1 size check shouldn't fire for a v2 caller. + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val errorResponses = packets.filter { it.hasRpcResponse() && it.rpcResponse.hasError() } + assertTrue(errorResponses.isEmpty()) + } + + @Test + fun handler_unregistered_method_sends_ack_then_packet_error() = runTest { + connect() + simulateRemoteJoinAsV2() + + simulateIncomingRequestStream("req-x", "unknown-method", "") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + // Ack first + assertTrue(packets.any { it.hasRpcAck() && it.rpcAck.requestId == "req-x" }) + // Then a v1 error RpcResponse packet — no response stream. + val errorResponse = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-x" && it.rpcResponse.hasError() + } + assertNotNull(errorResponse) + assertEquals( + RpcError.BuiltinRpcError.UNSUPPORTED_METHOD.create(), + RpcError.fromProto(errorResponse!!.rpcResponse.error), + ) + assertNull(collectOutgoingV2Stream(RPC_RESPONSE_DATA_STREAM_TOPIC)) + } + + @Test + fun handler_uncaught_exception_application_error_packet() = runTest { + connect() + simulateRemoteJoinAsV2() + + room.localParticipant.registerRpcMethod("boom") { + throw RuntimeException("oops") + } + simulateIncomingRequestStream("req-app-err", "boom", "") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val errorResponse = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-app-err" && it.rpcResponse.hasError() + } + assertNotNull(errorResponse) + assertEquals( + RpcError.BuiltinRpcError.APPLICATION_ERROR.create(), + RpcError.fromProto(errorResponse!!.rpcResponse.error), + ) + } + + @Test + fun handler_rpcerror_passthrough_packet() = runTest { + connect() + simulateRemoteJoinAsV2() + + val custom = RpcError(101, "custom error") + room.localParticipant.registerRpcMethod("err") { throw custom } + simulateIncomingRequestStream("req-custom", "err", "") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val errorResponse = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-custom" && it.rpcResponse.hasError() + } + assertNotNull(errorResponse) + assertEquals(custom, RpcError.fromProto(errorResponse!!.rpcResponse.error)) + } + + @Test + fun caller_response_timeout() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "hello", + payload = "hi", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + // Ack arrives so we get past the connection-timeout window. + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)!! + val requestId = outgoing.first[RpcRequestAttrs.REQUEST_ID]!! + subDataChannel.simulateBufferReceived(createAck(requestId)) + + coroutineRule.dispatcher.scheduler.advanceTimeBy(20_000) + assertEquals(RpcError.BuiltinRpcError.RESPONSE_TIMEOUT.create(), rpcJob.await()) + } + + @Test + fun caller_error_response_via_packet() = runTest { + connect() + simulateRemoteJoinAsV2() + + val customError = RpcError(101, "boom") + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "x", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)!! + val requestId = outgoing.first[RpcRequestAttrs.REQUEST_ID]!! + subDataChannel.simulateBufferReceived(createAck(requestId)) + subDataChannel.simulateBufferReceived(createV1Response(requestId, error = customError)) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals(customError, rpcJob.await()) + } + + @Test + fun caller_participant_disconnect() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "x", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + simulateMessageFromServer(TestData.PARTICIPANT_DISCONNECT) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals( + RpcError.BuiltinRpcError.RECIPIENT_DISCONNECTED.create(), + rpcJob.await(), + ) + } + + /** + * Spec #11. The response arrives with no scheduler advancement between publish and reply. + * A follow-up `performRpc` then succeeds, proving no `pendingAcks` / `pendingResponses` + * entries were orphaned by the fast path. + */ + @Test + fun caller_fast_response_immediately_after_publish() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob1 = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "fast", + payload = "p1", + ) + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val firstHeader = pubDataChannel.sentBuffers + .map { parsePacket(it) } + .first { it.hasStreamHeader() && it.streamHeader.topic == RPC_REQUEST_DATA_STREAM_TOPIC } + val requestId1 = firstHeader.streamHeader.attributesMap[RpcRequestAttrs.REQUEST_ID]!! + + // No advanceTimeBy / no scheduler tick between publish and reply. + subDataChannel.simulateBufferReceived(createAck(requestId1)) + simulateIncomingResponseStream(requestId1, "r1", streamId = "resp-1") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + assertEquals("r1", rpcJob1.await()) + + // A second back-to-back call works — proves no orphaned pending entries. + pubDataChannel.clearSentBuffers() + val rpcJob2 = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "fast", + payload = "p2", + ) + } + coroutineRule.dispatcher.scheduler.runCurrent() + val secondHeader = pubDataChannel.sentBuffers + .map { parsePacket(it) } + .first { it.hasStreamHeader() && it.streamHeader.topic == RPC_REQUEST_DATA_STREAM_TOPIC } + val requestId2 = secondHeader.streamHeader.attributesMap[RpcRequestAttrs.REQUEST_ID]!! + assertTrue("second call must get a fresh request id", requestId2 != requestId1) + subDataChannel.simulateBufferReceived(createAck(requestId2)) + simulateIncomingResponseStream(requestId2, "r2", streamId = "resp-2") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + assertEquals("r2", rpcJob2.await()) + } + + /** + * Spec #12. The ack and response are delivered synchronously from inside the publish path, + * before `performRpc` finishes the suspending publish. Verifies that pending-response state + * is registered *before* publish, so a response that arrives mid-publish still matches. + */ + @Test + fun caller_response_arrives_during_publish() = runTest { + connect() + simulateRemoteJoinAsV2() + + var injected = false + pubDataChannel.onSend = { buffer -> + if (!injected) { + val packet = DataPacket.parseFrom(ByteString.copyFrom(buffer.data.duplicate())) + if (packet.hasStreamHeader() && + packet.streamHeader.topic == RPC_REQUEST_DATA_STREAM_TOPIC + ) { + injected = true + val requestId = packet.streamHeader.attributesMap[RpcRequestAttrs.REQUEST_ID]!! + // Inject ack + response stream synchronously, while the publishing coroutine + // is mid-flight (it hasn't returned from this `send` call yet). + subDataChannel.simulateBufferReceived(createAck(requestId)) + simulateIncomingResponseStream(requestId, "during", streamId = "resp-during") + } + } + } + + val rpcJob = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "midflight", + payload = "p", + ) + } + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals("during", rpcJob.await()) + } + + /** + * Spec #13. After CONNECTION_TIMEOUT fires, a delayed ack + response stream MUST NOT + * resolve the promise a second time or otherwise crash. + */ + @Test + fun caller_late_ack_after_connection_timeout() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "late", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)!! + val requestId = outgoing.first[RpcRequestAttrs.REQUEST_ID]!! + + // No ack within the 7s round-trip window → CONNECTION_TIMEOUT. + coroutineRule.dispatcher.scheduler.advanceTimeBy(8_000) + assertEquals(RpcError.BuiltinRpcError.CONNECTION_TIMEOUT.create(), rpcJob.await()) + + // Now deliver a late ack + response. Must not throw or double-resolve. + subDataChannel.simulateBufferReceived(createAck(requestId)) + simulateIncomingResponseStream(requestId, "too-late", streamId = "resp-late") + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + // The original completion stays at CONNECTION_TIMEOUT; the rpcJob doesn't change. + assertEquals(RpcError.BuiltinRpcError.CONNECTION_TIMEOUT.create(), rpcJob.await()) + } + + @Test + fun caller_response_from_wrong_sender_ignored() = runTest { + connect() + simulateRemoteJoinAsV2() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "x", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val outgoing = collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)!! + val requestId = outgoing.first[RpcRequestAttrs.REQUEST_ID]!! + subDataChannel.simulateBufferReceived(createAck(requestId)) + // Response stream from a different sender identity — must be ignored. + simulateIncomingResponseStream(requestId, "spoofed", fromIdentity = "mallory") + coroutineRule.dispatcher.scheduler.advanceTimeBy(20_000) + + // Should have timed out instead of resolving with "spoofed". + assertEquals(RpcError.BuiltinRpcError.RESPONSE_TIMEOUT.create(), rpcJob.await()) + } + + @Test + fun caller_five_concurrent_calls_no_crosstalk() = runTest { + connect() + simulateRemoteJoinAsV2() + + val deferred = (1..5).map { idx -> + async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "echo", + payload = "req-$idx", + ) + } + } + coroutineRule.dispatcher.scheduler.runCurrent() + + // Five separate v2 request streams should have been produced. + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val headers = packets.filter { + it.hasStreamHeader() && it.streamHeader.topic == RPC_REQUEST_DATA_STREAM_TOPIC + } + assertEquals(5, headers.size) + + // Build a map of (requestId -> expected response based on the request payload). + val requestIdToResponse = headers.associate { header -> + val streamId = header.streamHeader.streamId + val requestId = header.streamHeader.attributesMap[RpcRequestAttrs.REQUEST_ID]!! + val chunks = packets + .filter { it.hasStreamChunk() && it.streamChunk.streamId == streamId } + .sortedBy { it.streamChunk.chunkIndex } + val requestPayload = buildString { + for (chunk in chunks) { + append(chunk.streamChunk.content.toString(Charsets.UTF_8)) + } + } + requestId to "resp-${requestPayload.removePrefix("req-")}" + } + + // Ack and respond to each. + for ((requestId, responsePayload) in requestIdToResponse) { + subDataChannel.simulateBufferReceived(createAck(requestId)) + simulateIncomingResponseStream(requestId, responsePayload) + } + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + // Each call resolves with its own response, not cross-talked. + for ((idx, d) in deferred.withIndex()) { + assertEquals("resp-${idx + 1}", d.await()) + } + } + + // ---------------------------- v1 ↔ v2 fallback ------------------------------------------ + + @Test + fun caller_to_v1_remote_uses_packet_not_stream() = runTest { + connect() + simulateRemoteJoinAsV1() + + val rpcJob = async { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "hello", + payload = "hi", + ) + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val rpcRequest = packets.firstOrNull { it.hasRpcRequest() } + assertNotNull("expected a v1 RpcRequest packet to a v1 remote", rpcRequest) + assertNull(collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)) + + val requestId = rpcRequest!!.rpcRequest.id + subDataChannel.simulateBufferReceived(createAck(requestId)) + subDataChannel.simulateBufferReceived(createV1Response(requestId, payload = "bye")) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals("bye", rpcJob.await()) + } + + @Test + fun handler_responds_to_v1_caller_via_packet() = runTest { + connect() + simulateRemoteJoinAsV1() + + room.localParticipant.registerRpcMethod("hello") { "world" } + // Feed a v1 RpcRequest packet (not a stream). + val v1Request = with(DataPacket.newBuilder()) { + participantIdentity = REMOTE_PARTICIPANT.identity + rpcRequest = with(LivekitModels.RpcRequest.newBuilder()) { + this.id = "req-v1" + this.method = "hello" + this.payload = "hi" + this.responseTimeoutMs = 10_000 + this.version = RPC_VERSION_V1 + build() + } + build() + } + subDataChannel.simulateBufferReceived(v1Request.toDataChannelBuffer()) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + // Ack + assertTrue(packets.any { it.hasRpcAck() && it.rpcAck.requestId == "req-v1" }) + // v1 success response packet (not a stream) + val response = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-v1" + } + assertNotNull(response) + assertEquals("world", response!!.rpcResponse.payload) + assertNull(collectOutgoingV2Stream(RPC_RESPONSE_DATA_STREAM_TOPIC)) + } + + @Test + fun caller_to_v1_remote_rejects_large_payload() = runTest { + connect() + simulateRemoteJoinAsV1() + + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "echo", + payload = "X".repeat(20_000), + ) + } catch (e: Throwable) { + thrown = e + } + + assertEquals(RpcError.BuiltinRpcError.REQUEST_PAYLOAD_TOO_LARGE.create(), thrown) + // No packet or stream should have been produced. + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + assertFalse(packets.any { it.hasRpcRequest() }) + assertNull(collectOutgoingV2Stream(RPC_REQUEST_DATA_STREAM_TOPIC)) + } + + /** Build a v1 `RpcRequest` packet from a v1 caller. */ + private fun v1RequestPacket( + requestId: String, + method: String, + payload: String, + responseTimeoutMs: Int = 10_000, + ) = with(DataPacket.newBuilder()) { + participantIdentity = REMOTE_PARTICIPANT.identity + rpcRequest = with(LivekitModels.RpcRequest.newBuilder()) { + this.id = requestId + this.method = method + this.payload = payload + this.responseTimeoutMs = responseTimeoutMs + this.version = RPC_VERSION_V1 + build() + } + build() + }.toDataChannelBuffer() + + /** + * Spec #18. Handler receives a v1 packet, handler throws a non-RpcError exception → v1 + * `RpcResponse` packet with `APPLICATION_ERROR`. + */ + @Test + fun v1_caller_handler_uncaught_exception_application_error_packet() = runTest { + connect() + simulateRemoteJoinAsV1() + + room.localParticipant.registerRpcMethod("boom") { + throw RuntimeException("oops") + } + subDataChannel.simulateBufferReceived(v1RequestPacket("req-v1-app", "boom", "")) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val errorResponse = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-v1-app" && it.rpcResponse.hasError() + } + assertNotNull(errorResponse) + assertEquals( + RpcError.BuiltinRpcError.APPLICATION_ERROR.create(), + RpcError.fromProto(errorResponse!!.rpcResponse.error), + ) + } + + /** + * Spec #19. Handler receives a v1 packet, handler throws an `RpcError` → v1 `RpcResponse` + * packet preserving the original code + message. + */ + @Test + fun v1_caller_handler_rpcerror_passthrough_packet() = runTest { + connect() + simulateRemoteJoinAsV1() + + val custom = RpcError(101, "custom v1 err") + room.localParticipant.registerRpcMethod("err") { throw custom } + subDataChannel.simulateBufferReceived(v1RequestPacket("req-v1-cust", "err", "")) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val errorResponse = packets.firstOrNull { + it.hasRpcResponse() && it.rpcResponse.requestId == "req-v1-cust" && it.rpcResponse.hasError() + } + assertNotNull(errorResponse) + assertEquals(custom, RpcError.fromProto(errorResponse!!.rpcResponse.error)) + } + + /** + * Spec #21. Caller targets a v1 remote (so it sends v1 `RpcRequest` packets) and the + * response never arrives → `RESPONSE_TIMEOUT`. + */ + @Test + fun v1_caller_response_timeout() = runTest { + connect() + simulateRemoteJoinAsV1() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "hello", + payload = "hi", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + // Ack so we get past the connection-timeout window. + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val rpcRequest = packets.first { it.hasRpcRequest() }.rpcRequest + subDataChannel.simulateBufferReceived(createAck(rpcRequest.id)) + + coroutineRule.dispatcher.scheduler.advanceTimeBy(20_000) + assertEquals(RpcError.BuiltinRpcError.RESPONSE_TIMEOUT.create(), rpcJob.await()) + } + + /** Spec #22. v1 caller receives an error `RpcResponse` packet → rejects with the error. */ + @Test + fun v1_caller_error_response_via_packet() = runTest { + connect() + simulateRemoteJoinAsV1() + + val customError = RpcError(101, "v1 boom") + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "x", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + val packets = pubDataChannel.sentBuffers.map { parsePacket(it) } + val rpcRequest = packets.first { it.hasRpcRequest() }.rpcRequest + subDataChannel.simulateBufferReceived(createAck(rpcRequest.id)) + subDataChannel.simulateBufferReceived(createV1Response(rpcRequest.id, error = customError)) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals(customError, rpcJob.await()) + } + + /** Spec #23. v1 caller, remote disconnects mid-call → `RECIPIENT_DISCONNECTED`. */ + @Test + fun v1_caller_participant_disconnect() = runTest { + connect() + simulateRemoteJoinAsV1() + + val rpcJob = async { + var thrown: Throwable? = null + try { + room.localParticipant.performRpc( + destinationIdentity = Participant.Identity(REMOTE_PARTICIPANT.identity), + method = "x", + payload = "p", + ) + } catch (e: Throwable) { + thrown = e + } + thrown + } + coroutineRule.dispatcher.scheduler.runCurrent() + + simulateMessageFromServer(TestData.PARTICIPANT_DISCONNECT) + coroutineRule.dispatcher.scheduler.advanceUntilIdle() + + assertEquals( + RpcError.BuiltinRpcError.RECIPIENT_DISCONNECTED.create(), + rpcJob.await(), + ) + } +} diff --git a/protocol b/protocol index 4c05a3325..8381f2180 160000 --- a/protocol +++ b/protocol @@ -1 +1 @@ -Subproject commit 4c05a3325ec35760bee1c0bfe57b7011604a124f +Subproject commit 8381f2180c45ab926b3ebf19df0608f1dadcac1e diff --git a/sample-app-common/src/main/java/io/livekit/android/sample/CallViewModel.kt b/sample-app-common/src/main/java/io/livekit/android/sample/CallViewModel.kt index 6e9678e11..b218278af 100644 --- a/sample-app-common/src/main/java/io/livekit/android/sample/CallViewModel.kt +++ b/sample-app-common/src/main/java/io/livekit/android/sample/CallViewModel.kt @@ -49,6 +49,7 @@ import io.livekit.android.room.track.LocalVideoTrack import io.livekit.android.room.track.Track import io.livekit.android.room.track.screencapture.ScreenCaptureParams import io.livekit.android.room.track.video.CameraCapturerUtils +import io.livekit.android.rpc.RpcError import io.livekit.android.sample.model.StressTest import io.livekit.android.sample.service.ForegroundService import io.livekit.android.util.LKLog @@ -144,6 +145,10 @@ class CallViewModel( private val mutablePermissionAllowed = MutableStateFlow(true) val permissionAllowed = mutablePermissionAllowed.hide() + // RPC tester state. Lives on the ViewModel so it survives dialog dismiss/reopen. + private val mutableHandlers = MutableStateFlow>(emptyList()) + val handlers: StateFlow> = mutableHandlers + init { CameraXHelper.createCameraProvider(ProcessLifecycleOwner.get()).let { @@ -335,6 +340,12 @@ class CallViewModel( override fun onCleared() { super.onCleared() + // Tear down any RPC handlers before releasing the room. + mutableHandlers.value.forEach { handler -> + runCatching { room.localParticipant.unregisterRpcMethod(handler.method) } + } + mutableHandlers.value = emptyList() + // Make sure to release any resources associated with LiveKit room.disconnect() room.release() @@ -384,6 +395,51 @@ class CallViewModel( } } + fun registerRpcHandler(method: String, initialResponse: String) { + if (method.isBlank()) return + val state = RpcHandlerState( + method = method, + staticResponse = MutableStateFlow(initialResponse), + invocations = MutableStateFlow(emptyList()), + ) + room.localParticipant.registerRpcMethod(method) { invocation -> + val record = RpcInvocationRecord( + timestamp = System.currentTimeMillis(), + caller = invocation.callerIdentity, + payload = invocation.payload, + ) + state.invocations.value = state.invocations.value + record + state.staticResponse.value + } + // Replace any prior entry for the same method (SDK overwrites anyway). + mutableHandlers.value = mutableHandlers.value.filterNot { it.method == method } + state + } + + fun unregisterRpcHandler(method: String) { + room.localParticipant.unregisterRpcMethod(method) + mutableHandlers.value = mutableHandlers.value.filterNot { it.method == method } + } + + fun updateStaticResponse(method: String, response: String) { + mutableHandlers.value.firstOrNull { it.method == method } + ?.staticResponse?.let { it.value = response } + } + + suspend fun performRpc( + destination: Participant.Identity, + method: String, + payload: String, + ): RpcRequestResult { + return try { + val response = room.localParticipant.performRpc(destination, method, payload) + RpcRequestResult.Success(response) + } catch (e: RpcError) { + RpcRequestResult.Error(e.code, e.message) + } catch (e: Throwable) { + RpcRequestResult.Error(null, e.message ?: e.toString()) + } + } + fun toggleSubscriptionPermissions() { mutablePermissionAllowed.value = !mutablePermissionAllowed.value room.localParticipant.setTrackSubscriptionPermissions(mutablePermissionAllowed.value) @@ -465,3 +521,20 @@ class CallViewModel( private fun LiveData.hide(): LiveData = this private fun MutableStateFlow.hide(): StateFlow = this + +data class RpcInvocationRecord( + val timestamp: Long, + val caller: Participant.Identity, + val payload: String, +) + +class RpcHandlerState( + val method: String, + val staticResponse: MutableStateFlow, + val invocations: MutableStateFlow>, +) + +sealed class RpcRequestResult { + data class Success(val response: String) : RpcRequestResult() + data class Error(val code: Int?, val message: String) : RpcRequestResult() +} diff --git a/sample-app-common/src/main/res/drawable/baseline_swap_horiz_24.xml b/sample-app-common/src/main/res/drawable/baseline_swap_horiz_24.xml new file mode 100644 index 000000000..db1e871a0 --- /dev/null +++ b/sample-app-common/src/main/res/drawable/baseline_swap_horiz_24.xml @@ -0,0 +1,10 @@ + + + diff --git a/sample-app/src/main/java/io/livekit/android/sample/CallActivity.kt b/sample-app/src/main/java/io/livekit/android/sample/CallActivity.kt index 5942b6d31..2cd2b85d5 100644 --- a/sample-app/src/main/java/io/livekit/android/sample/CallActivity.kt +++ b/sample-app/src/main/java/io/livekit/android/sample/CallActivity.kt @@ -35,6 +35,7 @@ import androidx.recyclerview.widget.LinearLayoutManager import com.xwray.groupie.GroupieAdapter import io.livekit.android.sample.common.R import io.livekit.android.sample.databinding.CallActivityBinding +import io.livekit.android.sample.dialog.RpcTestDialogFragment import io.livekit.android.sample.dialog.showAudioProcessorSwitchDialog import io.livekit.android.sample.dialog.showDebugMenuDialog import io.livekit.android.sample.dialog.showSelectAudioDeviceDialog @@ -219,6 +220,10 @@ class CallActivity : AppCompatActivity() { binding.debugMenu.setOnClickListener { showDebugMenuDialog(viewModel) } + + binding.rpcTest.setOnClickListener { + RpcTestDialogFragment().show(supportFragmentManager, "rpc_test") + } } override fun onResume() { diff --git a/sample-app/src/main/java/io/livekit/android/sample/dialog/RpcTestDialogFragment.kt b/sample-app/src/main/java/io/livekit/android/sample/dialog/RpcTestDialogFragment.kt new file mode 100644 index 000000000..4e943847d --- /dev/null +++ b/sample-app/src/main/java/io/livekit/android/sample/dialog/RpcTestDialogFragment.kt @@ -0,0 +1,285 @@ +/* + * Copyright 2026 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.sample.dialog + +import android.os.Bundle +import android.text.Editable +import android.text.TextWatcher +import android.view.LayoutInflater +import android.view.View +import android.view.ViewGroup +import android.widget.AdapterView +import android.widget.ArrayAdapter +import android.widget.Toast +import androidx.fragment.app.DialogFragment +import androidx.fragment.app.activityViewModels +import androidx.lifecycle.Lifecycle +import androidx.lifecycle.lifecycleScope +import androidx.lifecycle.repeatOnLifecycle +import androidx.recyclerview.widget.LinearLayoutManager +import com.xwray.groupie.GroupieAdapter +import com.xwray.groupie.viewbinding.BindableItem +import com.xwray.groupie.viewbinding.GroupieViewHolder +import io.livekit.android.room.participant.RemoteParticipant +import io.livekit.android.sample.CallViewModel +import io.livekit.android.sample.RpcHandlerState +import io.livekit.android.sample.RpcInvocationRecord +import io.livekit.android.sample.RpcRequestResult +import io.livekit.android.sample.databinding.DialogRpcTestBinding +import io.livekit.android.sample.databinding.ItemRpcHandlerBinding +import io.livekit.android.sample.databinding.ItemRpcInvocationBinding +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import java.text.SimpleDateFormat +import java.util.Date +import java.util.Locale + +private const val HELLO_PAYLOAD = "hello world" +private const val TWENTY_K_SIZE = 20 * 1024 + +private fun twentyKPayload(): String = "X".repeat(TWENTY_K_SIZE) + +class RpcTestDialogFragment : DialogFragment() { + + private var _binding: DialogRpcTestBinding? = null + private val binding get() = _binding!! + + private val viewModel: CallViewModel by activityViewModels() + + private val participantsList = mutableListOf() + private lateinit var participantSpinnerAdapter: ArrayAdapter + private val handlersAdapter = GroupieAdapter() + + override fun onCreateView( + inflater: LayoutInflater, + container: ViewGroup?, + savedInstanceState: Bundle?, + ): View { + _binding = DialogRpcTestBinding.inflate(inflater, container, false) + return binding.root + } + + override fun onViewCreated(view: View, savedInstanceState: Bundle?) { + super.onViewCreated(view, savedInstanceState) + + binding.closeButton.setOnClickListener { dismiss() } + + participantSpinnerAdapter = ArrayAdapter( + requireContext(), + android.R.layout.simple_spinner_item, + mutableListOf(), + ) + participantSpinnerAdapter.setDropDownViewResource(android.R.layout.simple_spinner_dropdown_item) + binding.participantSpinner.adapter = participantSpinnerAdapter + + binding.presetHello.setOnClickListener { binding.payloadEdit.setText(HELLO_PAYLOAD) } + binding.preset20k.setOnClickListener { binding.payloadEdit.setText(twentyKPayload()) } + + binding.sendButton.setOnClickListener { sendRpc() } + + binding.registerButton.setOnClickListener { + val topic = binding.topicEdit.text.toString().trim() + if (topic.isEmpty()) { + Toast.makeText(requireContext(), "Enter a topic", Toast.LENGTH_SHORT).show() + return@setOnClickListener + } + viewModel.registerRpcHandler(topic, initialResponse = "") + binding.topicEdit.text.clear() + } + + binding.handlersList.layoutManager = LinearLayoutManager(requireContext()) + binding.handlersList.adapter = handlersAdapter + + viewLifecycleOwner.lifecycleScope.launch { + repeatOnLifecycle(Lifecycle.State.STARTED) { + viewModel.participants + .map { list -> list.filterIsInstance() } + .collect { remotes -> + participantsList.clear() + participantsList.addAll(remotes) + participantSpinnerAdapter.clear() + participantSpinnerAdapter.addAll( + remotes.map { it.identity?.value ?: "(unknown)" }, + ) + } + } + } + + viewLifecycleOwner.lifecycleScope.launch { + repeatOnLifecycle(Lifecycle.State.STARTED) { + viewModel.handlers.collect { handlers -> + handlersAdapter.update( + handlers.map { RpcHandlerItem(it, viewModel) }, + ) + } + } + } + } + + private fun sendRpc() { + val pos = binding.participantSpinner.selectedItemPosition + if (pos == AdapterView.INVALID_POSITION || pos >= participantsList.size) { + Toast.makeText(requireContext(), "No participant selected", Toast.LENGTH_SHORT).show() + return + } + val identity = participantsList[pos].identity + if (identity == null) { + Toast.makeText(requireContext(), "Participant has no identity", Toast.LENGTH_SHORT).show() + return + } + val method = binding.methodEdit.text.toString().trim() + if (method.isEmpty()) { + Toast.makeText(requireContext(), "Enter a method name", Toast.LENGTH_SHORT).show() + return + } + val payload = binding.payloadEdit.text.toString() + + binding.sendButton.isEnabled = false + binding.responseText.visibility = View.GONE + binding.sendSpinner.visibility = View.VISIBLE + + viewLifecycleOwner.lifecycleScope.launch { + val result = viewModel.performRpc(identity, method, payload) + val b = _binding ?: return@launch + b.sendSpinner.visibility = View.GONE + b.sendButton.isEnabled = true + b.responseText.visibility = View.VISIBLE + b.responseText.text = when (result) { + is RpcRequestResult.Success -> "Success:\n${result.response}" + is RpcRequestResult.Error -> "Error: [${result.code ?: "?"}] ${result.message}" + } + } + } + + override fun onStart() { + super.onStart() + dialog?.window?.setLayout( + ViewGroup.LayoutParams.MATCH_PARENT, + ViewGroup.LayoutParams.MATCH_PARENT, + ) + } + + override fun onDestroyView() { + binding.handlersList.adapter = null + super.onDestroyView() + _binding = null + } +} + +private val invocationTimeFormat = SimpleDateFormat("HH:mm:ss.SSS", Locale.US) + +class RpcHandlerItem( + private val state: RpcHandlerState, + private val viewModel: CallViewModel, +) : BindableItem() { + + private var scope: CoroutineScope? = null + private var watcher: TextWatcher? = null + + override fun initializeViewBinding(view: View): ItemRpcHandlerBinding = + ItemRpcHandlerBinding.bind(view) + + override fun getLayout(): Int = io.livekit.android.sample.R.layout.item_rpc_handler + + override fun bind(viewBinding: ItemRpcHandlerBinding, position: Int) { + viewBinding.methodName.text = state.method + + viewBinding.unregisterButton.setOnClickListener { + viewModel.unregisterRpcHandler(state.method) + } + + // Bind the static-response edit. Remove any prior watcher before mutating text so + // we don't fire it during programmatic updates. + viewBinding.staticResponseEdit.removeTextChangedListener(watcher) + val current = state.staticResponse.value + if (viewBinding.staticResponseEdit.text?.toString() != current) { + viewBinding.staticResponseEdit.setText(current) + } + val newWatcher = object : TextWatcher { + override fun afterTextChanged(s: Editable?) { + viewModel.updateStaticResponse(state.method, s?.toString() ?: "") + } + + override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) = Unit + override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) = Unit + } + viewBinding.staticResponseEdit.addTextChangedListener(newWatcher) + watcher = newWatcher + + viewBinding.handlerPresetHello.setOnClickListener { + viewBinding.staticResponseEdit.setText(HELLO_PAYLOAD) + } + viewBinding.handlerPreset20k.setOnClickListener { + viewBinding.staticResponseEdit.setText(twentyKPayload()) + } + + val invocationsAdapter = GroupieAdapter() + viewBinding.invocationList.layoutManager = LinearLayoutManager(viewBinding.root.context) + viewBinding.invocationList.adapter = invocationsAdapter + + scope?.cancel() + val newScope = CoroutineScope(SupervisorJob() + Dispatchers.Main) + scope = newScope + newScope.launch { + state.invocations.collect { records -> + viewBinding.emptyLabel.visibility = if (records.isEmpty()) View.VISIBLE else View.GONE + invocationsAdapter.update(records.map { RpcInvocationItem(it) }) + } + } + } + + override fun unbind(viewHolder: GroupieViewHolder) { + scope?.cancel() + scope = null + viewHolder.binding.staticResponseEdit.removeTextChangedListener(watcher) + watcher = null + viewHolder.binding.invocationList.adapter = null + super.unbind(viewHolder) + } + + override fun isSameAs(other: com.xwray.groupie.Item<*>): Boolean = + other is RpcHandlerItem && other.state.method == state.method + + override fun hasSameContentAs(other: com.xwray.groupie.Item<*>): Boolean = + other is RpcHandlerItem && other.state === state +} + +class RpcInvocationItem(private val record: RpcInvocationRecord) : + BindableItem() { + + override fun initializeViewBinding(view: View): ItemRpcInvocationBinding = + ItemRpcInvocationBinding.bind(view) + + override fun getLayout(): Int = io.livekit.android.sample.R.layout.item_rpc_invocation + + override fun bind(viewBinding: ItemRpcInvocationBinding, position: Int) { + val time = invocationTimeFormat.format(Date(record.timestamp)) + viewBinding.timestamp.text = "$time ${record.caller.value}" + val bytes = record.payload.toByteArray(Charsets.UTF_8).size + viewBinding.byteLength.text = "${bytes}B" + viewBinding.payloadText.text = record.payload + } + + override fun isSameAs(other: com.xwray.groupie.Item<*>): Boolean = + other is RpcInvocationItem && + other.record.timestamp == record.timestamp && + other.record.caller == record.caller +} diff --git a/sample-app/src/main/res/layout/call_activity.xml b/sample-app/src/main/res/layout/call_activity.xml index e30e9dadd..a76c2088b 100644 --- a/sample-app/src/main/res/layout/call_activity.xml +++ b/sample-app/src/main/res/layout/call_activity.xml @@ -139,5 +139,15 @@ android:padding="@dimen/control_padding" android:src="@drawable/dots_horizontal_circle_outline" app:tint="@android:color/white" /> + + diff --git a/sample-app/src/main/res/layout/dialog_rpc_test.xml b/sample-app/src/main/res/layout/dialog_rpc_test.xml new file mode 100644 index 000000000..2c21f6455 --- /dev/null +++ b/sample-app/src/main/res/layout/dialog_rpc_test.xml @@ -0,0 +1,211 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +