dataconnect(chore): rewrite bidirectional gRPC connection to use a custom class rather than generated stubs#8179
dataconnect(chore): rewrite bidirectional gRPC connection to use a custom class rather than generated stubs#8179dconeybe wants to merge 51 commits into
Conversation
… is sent after last unsubscription for a query`
…ors can subscribe after old collectors are cancelled`
… dataConnect.close()
…of IllegalStateException when the stream has been closed to help with unwinding subscribe flows
…ith ExecuteQuery and ExecuteResponse.
…rary and generated code (will be adapted later)
…ogle's internal antigravity version, with a new "docs" directory recording the decisions and designs). NOTE: The code needs to be reviewed by a human!
…nCompletion() to make sure that the subscription is unsubscribed.
…tion is re-established after being closed due to unsubscriptions`
…equent flow subscriptions return fresh data`
…he replay cache to new subscribers using sequence numbers.
…r flow subscriptions do not return data from in-flight subscribe`
…: `later flow subscriptions do not return data from in-flight subscribe`
… add it back later once it is implemented)
…sta: `flow reports error if server completes the RPC mid-stream`
…and is not superseded by other tests
… 4: Late Subscribers Hang Indefinitely on Completed Stream
…micReference access to state, which is already done serially
diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt index 3dfa3da..272d847 100644 --- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt +++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt @@ -268,33 +268,25 @@ internal class DataConnectBidiConnectStream( inner class Subscriber { private var state: State = State.NotReady(pendingSubscribe = false) - private var subscribed = false - suspend fun setOutgoingRequests(outgoingRequests: SendChannel<StreamRequestProto>) { - val currentState = state - val wasPendingSubscribe = - when (currentState) { - is State.NotReady -> { - state = State.Ready(outgoingRequests) - currentState.pendingSubscribe - } - is State.Ready -> { - check(currentState.outgoingRequests === outgoingRequests) { - "internal error n99tc8qe2t: setOutgoingRequests() has already been called " + - "with a different object" - } - false + when (val currentState = state) { + is State.NotReady -> { + val readyState = State.Ready(outgoingRequests) + state = readyState + if (currentState.pendingSubscribe) { + mutex.withLock { subscribe(readyState) } } } - - if (wasPendingSubscribe) { - mutex.withLock { subscribe(outgoingRequests) } + is State.Ready -> + check(currentState.outgoingRequests === outgoingRequests) { + "internal error n99tc8qe2t: setOutgoingRequests() has already been called " + + "with a different object" + } } } suspend fun subscribe(): Boolean { - val currentState = state - return when (currentState) { + return when (val currentState = state) { is State.NotReady -> { check(!currentState.pendingSubscribe) { "internal error szx94f63tz: subscribe() called when already subscribed" @@ -304,13 +296,13 @@ internal class DataConnectBidiConnectStream( } is State.Ready -> mutex.withLock { - return subscribe(currentState.outgoingRequests) + return subscribe(currentState) } } } - private fun subscribe(outgoingRequests: SendChannel<StreamRequestProto>): Boolean { - check(!subscribed) { + private fun subscribe(readyState: State.Ready): Boolean { + check(!readyState.subscribed) { "internal error hkjgvhnk27: subscribe() called when already subscribed " + "(is concurrent access to the SubscriptionStateManager properly serialized " + "with a mutex?)" @@ -323,11 +315,11 @@ internal class DataConnectBidiConnectStream( resumeStreamRequest } - val sendResult = outgoingRequests.trySend(streamRequest) + val sendResult = readyState.outgoingRequests.trySend(streamRequest) return when { sendResult.isSuccess -> { - subscribed = true + readyState.subscribed = true subscriberCount++ true } @@ -342,25 +334,21 @@ internal class DataConnectBidiConnectStream( } suspend fun unsubscribe() { - val currentState = state - when (currentState) { - is State.NotReady -> { + when (val currentState = state) { + is State.NotReady -> if (currentState.pendingSubscribe) { state = State.NotReady(pendingSubscribe = false) } - } - is State.Ready -> { - mutex.withLock { unsubscribe(currentState.outgoingRequests) } - } + is State.Ready -> mutex.withLock { unsubscribe(currentState) } } } - private fun unsubscribe(outgoingRequests: SendChannel<StreamRequestProto>) { - if (!subscribed) { + private fun unsubscribe(readyState: State.Ready) { + if (!readyState.subscribed) { return } - subscribed = false + readyState.subscribed = false subscriberCount-- check(subscriberCount >= 0) { "internal error hpn3qsj746: subscriberCount should never be less than zero, " + @@ -368,7 +356,7 @@ internal class DataConnectBidiConnectStream( } if (subscriberCount == 0) { - val sendResult = outgoingRequests.trySend(cancelStreamRequest) + val sendResult = readyState.outgoingRequests.trySend(cancelStreamRequest) if (sendResult.isFailure && !sendResult.isClosed) { error( "internal error mxcsq556tv: outgoingRequests.trySend(cancel) " + @@ -412,8 +400,12 @@ internal class DataConnectBidiConnectStream( override fun toString() = "NotReady(pendingSubscribe=$pendingSubscribe)" } - class Ready(val outgoingRequests: SendChannel<StreamRequestProto>) : State { - override fun toString() = "Ready" + class Ready( + val outgoingRequests: SendChannel<StreamRequestProto>, + // NOTE: @volatile is applied to `subscribed` so that toString() can safely read its value. + @volatile var subscribed: Boolean = false, + ) : State { + override fun toString() = "Ready(subscribed=$subscribed)" } } }
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. |
📝 PRs merging into main branchOur main branch should always be in a releasable state. If you are working on a larger change, or if you don't want this change to see the light of the day just yet, consider using a feature branch first, and only merge into the main branch when the code complete and ready to be released. |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request refactors the bidirectional gRPC stream management by introducing a custom GrpcBidiFlow utility, replacing the standard coroutine stubs. This change enables better lifecycle correlation and allows the use of SharingStarted.WhileSubscribed with a zero replay expiration to ensure connections are self-cleaning and stale data is cleared. Additionally, sequence number filtering is implemented to protect late subscribers from stale replayed messages. The PR also includes several Architecture Decision Records (ADRs) documenting these design choices and adds comprehensive tests for connection robustness. Feedback identifies a typo in an error message within GrpcBidiFlow and suggests a minor cleanup for redundant logging output.
…ts}` -> `"initRequestsLists.size=${initRequestsLists.size}` [skip actions]
Summary
Rewires the bidirectional gRPC connection for Firebase Data Connect realtime queries to use a custom
GrpcBidiFlowimplementation, enabling better lifecycle management and resource cleanup.Highlights
GrpcBidiFlowto reliably correlate request and response streams in bidirectional gRPC calls, resolving a core limitation of the standard gRPC Kotlin stubs.SharingStarted.WhileSubscribed()inDataConnectBidiConnectStreamto ensure connections are lazily opened and automatically closed when no subscribers remain.WhileSubscribedconfiguration.Changelog
View Changelog
GrpcBidiFlow.replayExpirationMillis = 0for fresh data.GrpcBidiFlowandWhileSubscribedsharing strategy.connect()to return aDataConnectBidiConnectStreamand support lazy initialization.