Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-shrimps-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Properly update the server info after reconnect
1 change: 1 addition & 0 deletions livekit-android-sdk/detekt-baseline-release.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<ID>NestedBlockDepth:RTCEngine.kt$RTCEngine$private fun makeRTCConfig( serverResponse: Either&lt;JoinResponse, ReconnectResponse>, connectOptions: ConnectOptions, ): RTCConfiguration</ID>
<ID>NestedBlockDepth:Room.kt$Room$override suspend fun onPostReconnect(isFullReconnect: Boolean)</ID>
<ID>NestedBlockDepth:SignalClient.kt$SignalClient$override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?)</ID>
<ID>NestedBlockDepth:SignalClient.kt$SignalClient$private fun handleSignalResponse(ws: WebSocket, response: LivekitRtc.SignalResponse)</ID>
<ID>SwallowedException:FlowExt.kt$e: CancellationException</ID>
<ID>SwallowedException:LocalVideoTrack.kt$LocalVideoTrack$e: Exception</ID>
<ID>SwallowedException:TextureViewRenderer.kt$TextureViewRenderer$e: NotFoundException</ID>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,8 @@ internal constructor(
subscriber?.updateRTCConfig(rtcConfig)
publisher?.updateRTCConfig(rtcConfig)
lastMessageSeq = reconnectResponse.lastMessageSeq
} else {
LKLog.w { "Did not receive reconnect response" }
}
client.onReadyForResponses()
} catch (e: Exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,8 @@ constructor(
internal var lastOptions: ConnectOptions? = null
private var lastRoomOptions: RoomOptions? = null

// join will always return a JoinResponse.
// reconnect will return a ReconnectResponse or a Unit if a different response was received.
@Volatile
private var joinContinuation: CancellableContinuation<
Either<
JoinResponse,
Either<ReconnectResponse, Unit>,
>,
>? = null
private var joinContinuation: CancellableContinuation<ConnectResult>? = null
private lateinit var coroutineScope: CloseableCoroutineScope

/**
Expand Down Expand Up @@ -141,8 +134,10 @@ constructor(
options: ConnectOptions = ConnectOptions(),
roomOptions: RoomOptions = RoomOptions(),
): JoinResponse {
val joinResponse = connect(url, token, options, roomOptions)
return (joinResponse as Either.Left).value
return when (val result = connect(url, token, options, roomOptions)) {
is ConnectResult.Join -> result.response
else -> throw IllegalStateException("Unexpected response during join: $result")
}
}

/**
Expand All @@ -151,25 +146,30 @@ constructor(
@Throws(Exception::class)
@VisibleForTesting
suspend fun reconnect(url: String, token: String, participantSid: String?): Either<ReconnectResponse, Unit> {
val reconnectResponse = connect(
url,
token,
(lastOptions ?: ConnectOptions()).copy()
.apply {
reconnect = true
this.participantSid = participantSid
},
lastRoomOptions ?: RoomOptions(),
)
return (reconnectResponse as Either.Right).value
return when (
val result = connect(
url,
token,
(lastOptions ?: ConnectOptions()).copy()
.apply {
reconnect = true
this.participantSid = participantSid
},
lastRoomOptions ?: RoomOptions(),
)
) {
is ConnectResult.Reconnect -> Either.Left(result.response)
is ConnectResult.OtherResponse -> Either.Right(Unit)
is ConnectResult.Join -> throw IllegalStateException("Unexpected join response during reconnect")
}
}

private suspend fun connect(
url: String,
token: String,
options: ConnectOptions,
roomOptions: RoomOptions,
): Either<JoinResponse, Either<ReconnectResponse, Unit>> {
): ConnectResult {
// Clean up any pre-existing connection.
close(reason = "Starting new connection", shouldClearQueuedRequests = false)

Expand Down Expand Up @@ -691,7 +691,7 @@ constructor(
edition = ServerInfo.Edition.fromProto(response.join.serverInfo.edition),
version = serverVersion
)
joinContinuation?.resumeWith(Result.success(Either.Left(response.join)))
joinContinuation?.resumeWith(Result.success(ConnectResult.Join(response.join)))
joinContinuation = null
} else if (response.hasLeave()) {
// Some reconnects may immediately send leave back without a join response first.
Expand All @@ -711,10 +711,21 @@ constructor(
startPingJob()

if (response.hasReconnect()) {
joinContinuation?.resumeWith(Result.success(Either.Right(Either.Left(response.reconnect))))
if (response.reconnect.hasServerInfo()) {
try {
serverVersion = Semver(response.reconnect.serverInfo.version)
} catch (t: Throwable) {
LKLog.w(t) { "Thrown while trying to parse server version from reconnect." }
}
serverInfo = ServerInfo(
edition = ServerInfo.Edition.fromProto(response.reconnect.serverInfo.edition),
version = serverVersion,
)
}
joinContinuation?.resumeWith(Result.success(ConnectResult.Reconnect(response.reconnect)))
joinContinuation = null
} else {
joinContinuation?.resumeWith(Result.success(Either.Right(Either.Right(Unit))))
joinContinuation?.resumeWith(Result.success(ConnectResult.OtherResponse))
joinContinuation = null
// Non-reconnect response, handle normally
shouldProcessMessage = true
Expand Down Expand Up @@ -829,7 +840,8 @@ constructor(
}

LivekitRtc.SignalResponse.MessageCase.RECONNECT -> {
// TODO
// Handshake-only message; handled in handleSignalResponse() before connection.
LKLog.d { "ignoring reconnect response received after connected" }
}

LivekitRtc.SignalResponse.MessageCase.SUBSCRIPTION_RESPONSE -> {
Expand Down Expand Up @@ -960,6 +972,16 @@ constructor(
fun onLocalTrackSubscribed(trackSubscribed: LivekitRtc.TrackSubscribed)
}

/**
* Result of waiting for the initial signal response after opening the WebSocket.
* Join always yields [Join]; reconnect yields [Reconnect] or [OtherResponse].
*/
private sealed class ConnectResult {
data class Join(val response: JoinResponse) : ConnectResult()
data class Reconnect(val response: ReconnectResponse) : ConnectResult()
data object OtherResponse : ConnectResult()
}

companion object {
const val CONNECT_QUERY_TOKEN = "access_token"
const val CONNECT_QUERY_RECONNECT = "reconnect"
Expand Down