Skip to content

dataconnect(chore): rewrite bidirectional gRPC connection to use a custom class rather than generated stubs#8179

Open
dconeybe wants to merge 51 commits into
mainfrom
dconeybe/dataconnect/RealtimeRewire4
Open

dataconnect(chore): rewrite bidirectional gRPC connection to use a custom class rather than generated stubs#8179
dconeybe wants to merge 51 commits into
mainfrom
dconeybe/dataconnect/RealtimeRewire4

Conversation

@dconeybe
Copy link
Copy Markdown
Contributor

@dconeybe dconeybe commented May 19, 2026

Summary

Rewires the bidirectional gRPC connection for Firebase Data Connect realtime queries to use a custom GrpcBidiFlow implementation, enabling better lifecycle management and resource cleanup.

Highlights

  • Introduced GrpcBidiFlow to reliably correlate request and response streams in bidirectional gRPC calls, resolving a core limitation of the standard gRPC Kotlin stubs.
  • Switched to SharingStarted.WhileSubscribed() in DataConnectBidiConnectStream to ensure connections are lazily opened and automatically closed when no subscribers remain.
  • Implemented sequence number filtering to prevent stale replayed data from affecting the local state on reconnection.
  • Updated extensive unit tests to verify lazy connection behavior, connection re-establishment, and robust error handling.
  • Added Architecture Decision Records (ADRs) to document the rationale behind the new connection model, sequence number filtering, and WhileSubscribed configuration.

Changelog

View Changelog
  • RealtimeTodo.md: Updated TODO list with new items regarding late subscribers.
  • README.md (docs): Added documentation on how to use and write Architecture Decision Records (ADRs).
  • adr-0001-rewire-bidi-connection-to-grpcbidiflow.md: Documented the decision to move to GrpcBidiFlow.
  • adr-0002-use-replay-expiration-millis-0-in-while-subscribed.md: Documented the use of replayExpirationMillis = 0 for fresh data.
  • adr-0003-sequence-number-filtering-for-stale-replayed-data.md: Documented sequence number filtering for stale data.
  • DataConnectBidiConnectStream.kt: Refactored to use GrpcBidiFlow and WhileSubscribed sharing strategy.
  • DataConnectGrpcRPCs.kt: Updated connect() to return a DataConnectBidiConnectStream and support lazy initialization.
  • GrpcBidiFlow.kt: Implemented custom bidirectional gRPC flow with explicit lifecycle correlation and listener support.
  • RealtimeQuerySubscriptionImpl.kt: Updated to handle lazy flows and proper unsubscription behavior.
  • IdStringGenerator.kt: Minor adjustments to support new request ID generation patterns.
  • DataConnectGrpcRPCsUnitTest.kt: Updated tests to reflect lazy connection initialization.
  • RealtimeQuerySubscriptionImplUnitTest.kt: Added comprehensive tests for connection lifecycle, re-establishment, and multi-subscriber scenarios.
  • InProcessDataConnectGrpcStreamingServer.kt: Enhanced testing server with utility methods for awaiting specific gRPC events.
  • TurbineUtils.kt: Added generic utility for awaiting items satisfying a predicate in Turbine.

dconeybe added 30 commits May 18, 2026 16:56
… is sent after last unsubscription for a query`
…ors can subscribe after old collectors are cancelled`
…of IllegalStateException when the stream has been closed to help with unwinding subscribe flows
…rary and generated code (will be adapted later)
…ogle's internal antigravity version, with a new "docs" directory recording the decisions and designs).

NOTE: The code needs to be reviewed by a human!
…nCompletion() to make sure that the subscription is unsubscribed.
…tion is re-established after being closed due to unsubscriptions`
…equent flow subscriptions return fresh data`
…he replay cache to new subscribers using sequence numbers.
…r flow subscriptions do not return data from in-flight subscribe`
…: `later flow subscriptions do not return data from in-flight subscribe`
dconeybe added 16 commits May 18, 2026 16:56
…sta: `flow reports error if server completes the RPC mid-stream`
… 4: Late Subscribers Hang Indefinitely on Completed Stream
…micReference access to state, which is already done serially
diff --git a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt
index 3dfa3da..272d847 100644
--- a/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt
+++ b/firebase-dataconnect/src/main/kotlin/com/google/firebase/dataconnect/core/DataConnectBidiConnectStream.kt
@@ -268,33 +268,25 @@ internal class DataConnectBidiConnectStream(
     inner class Subscriber {
       private var state: State = State.NotReady(pendingSubscribe = false)

-      private var subscribed = false
-
       suspend fun setOutgoingRequests(outgoingRequests: SendChannel<StreamRequestProto>) {
-        val currentState = state
-        val wasPendingSubscribe =
-          when (currentState) {
-            is State.NotReady -> {
-              state = State.Ready(outgoingRequests)
-              currentState.pendingSubscribe
-            }
-            is State.Ready -> {
-              check(currentState.outgoingRequests === outgoingRequests) {
-                "internal error n99tc8qe2t: setOutgoingRequests() has already been called " +
-                  "with a different object"
-              }
-              false
+        when (val currentState = state) {
+          is State.NotReady -> {
+            val readyState = State.Ready(outgoingRequests)
+            state = readyState
+            if (currentState.pendingSubscribe) {
+              mutex.withLock { subscribe(readyState) }
             }
           }
-
-        if (wasPendingSubscribe) {
-          mutex.withLock { subscribe(outgoingRequests) }
+          is State.Ready ->
+            check(currentState.outgoingRequests === outgoingRequests) {
+              "internal error n99tc8qe2t: setOutgoingRequests() has already been called " +
+                "with a different object"
+            }
         }
       }

       suspend fun subscribe(): Boolean {
-        val currentState = state
-        return when (currentState) {
+        return when (val currentState = state) {
           is State.NotReady -> {
             check(!currentState.pendingSubscribe) {
               "internal error szx94f63tz: subscribe() called when already subscribed"
@@ -304,13 +296,13 @@ internal class DataConnectBidiConnectStream(
           }
           is State.Ready ->
             mutex.withLock {
-              return subscribe(currentState.outgoingRequests)
+              return subscribe(currentState)
             }
         }
       }

-      private fun subscribe(outgoingRequests: SendChannel<StreamRequestProto>): Boolean {
-        check(!subscribed) {
+      private fun subscribe(readyState: State.Ready): Boolean {
+        check(!readyState.subscribed) {
           "internal error hkjgvhnk27: subscribe() called when already subscribed " +
             "(is concurrent access to the SubscriptionStateManager properly serialized " +
             "with a mutex?)"
@@ -323,11 +315,11 @@ internal class DataConnectBidiConnectStream(
             resumeStreamRequest
           }

-        val sendResult = outgoingRequests.trySend(streamRequest)
+        val sendResult = readyState.outgoingRequests.trySend(streamRequest)

         return when {
           sendResult.isSuccess -> {
-            subscribed = true
+            readyState.subscribed = true
             subscriberCount++
             true
           }
@@ -342,25 +334,21 @@ internal class DataConnectBidiConnectStream(
       }

       suspend fun unsubscribe() {
-        val currentState = state
-        when (currentState) {
-          is State.NotReady -> {
+        when (val currentState = state) {
+          is State.NotReady ->
             if (currentState.pendingSubscribe) {
               state = State.NotReady(pendingSubscribe = false)
             }
-          }
-          is State.Ready -> {
-            mutex.withLock { unsubscribe(currentState.outgoingRequests) }
-          }
+          is State.Ready -> mutex.withLock { unsubscribe(currentState) }
         }
       }

-      private fun unsubscribe(outgoingRequests: SendChannel<StreamRequestProto>) {
-        if (!subscribed) {
+      private fun unsubscribe(readyState: State.Ready) {
+        if (!readyState.subscribed) {
           return
         }

-        subscribed = false
+        readyState.subscribed = false
         subscriberCount--
         check(subscriberCount >= 0) {
           "internal error hpn3qsj746: subscriberCount should never be less than zero, " +
@@ -368,7 +356,7 @@ internal class DataConnectBidiConnectStream(
         }

         if (subscriberCount == 0) {
-          val sendResult = outgoingRequests.trySend(cancelStreamRequest)
+          val sendResult = readyState.outgoingRequests.trySend(cancelStreamRequest)
           if (sendResult.isFailure && !sendResult.isClosed) {
             error(
               "internal error mxcsq556tv: outgoingRequests.trySend(cancel) " +
@@ -412,8 +400,12 @@ internal class DataConnectBidiConnectStream(
         override fun toString() = "NotReady(pendingSubscribe=$pendingSubscribe)"
       }

-      class Ready(val outgoingRequests: SendChannel<StreamRequestProto>) : State {
-        override fun toString() = "Ready"
+      class Ready(
+        val outgoingRequests: SendChannel<StreamRequestProto>,
+        // NOTE: @volatile is applied to `subscribed` so that toString() can safely read its value.
+        @volatile var subscribed: Boolean = false,
+      ) : State {
+        override fun toString() = "Ready(subscribed=$subscribed)"
       }
     }
   }
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 19, 2026

📝 PRs merging into main branch

Our main branch should always be in a releasable state. If you are working on a larger change, or if you don't want this change to see the light of the day just yet, consider using a feature branch first, and only merge into the main branch when the code complete and ready to be released.

@dconeybe
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the bidirectional gRPC stream management by introducing a custom GrpcBidiFlow utility, replacing the standard coroutine stubs. This change enables better lifecycle correlation and allows the use of SharingStarted.WhileSubscribed with a zero replay expiration to ensure connections are self-cleaning and stale data is cleared. Additionally, sequence number filtering is implemented to protect late subscribers from stale replayed messages. The PR also includes several Architecture Decision Records (ADRs) documenting these design choices and adds comprehensive tests for connection robustness. Feedback identifies a typo in an error message within GrpcBidiFlow and suggests a minor cleanup for redundant logging output.

@dconeybe dconeybe changed the title Dconeybe/dataconnect/realtime rewire4 dataconnect(chore): rewrite bidirectional gRPC connection to use a custom class rather than generated stubs May 19, 2026
@dconeybe dconeybe marked this pull request as ready for review May 19, 2026 00:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant