Skip to content

[AIT-276] feat: introduce ACK-based local application of LiveObjects ops#1194

Open
ttypic wants to merge 3 commits intomainfrom
AIT-276/apply-on-ack
Open

[AIT-276] feat: introduce ACK-based local application of LiveObjects ops#1194
ttypic wants to merge 3 commits intomainfrom
AIT-276/apply-on-ack

Conversation

@ttypic
Copy link
Contributor

@ttypic ttypic commented Feb 26, 2026

introduce ACK-based local application of LiveObjects ops

  • Implemented synthetic ACK-based object message application logic (publishAndApply).
  • Added buffering and order-preserving application of ACK results during sync (applyAckResult).
  • Introduced ObjectsOperationSource enum to distinguish operation sources (ACK vs. channel).
  • Updated applyObject and related object-specific management functions to utilize the source enum.
  • Enhanced tests for ACK-based application and updated handling of unsupported operations (returns false instead of throwing).

Spec: ably/specification#419
JS: ably/ably-js#2155

Summary by CodeRabbit

  • New Features

    • App now propagates a server site identifier and applies local object changes when the server ACKs publishes, ensuring immediate consistent local state.
  • Bug Fixes

    • Improved failure reporting when channel state prevents publish-and-apply.
    • Stronger deduplication and ordering between locally ACK-applied changes and channel updates.
  • Tests

    • Expanded test coverage for ACK handling, LOCAL vs CHANNEL semantics, buffering/syncing and publish-and-apply flows.

@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

Walkthrough

Adds connection siteCode propagation and implements a publish-and-apply ACK flow that synthesizes LOCAL inbound messages on server ACK; introduces source-aware operation handling (LOCAL vs CHANNEL), ACK buffering/deduplication, changes apply methods to return Boolean success, and updates managers and tests accordingly.

Changes

Cohort / File(s) Summary
Connection Infrastructure
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java, lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
Added siteCode field to ConnectionManager and ConnectionDetails; deserializes siteCode and sets it when connected.
Publish & Local-Apply Flow
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
Added publishAndApply() and changed publish() to return PublishResult; on ACK validates site/serial, synthesizes LOCAL inbound messages, records applied serials for dedupe.
ACK Buffering & Ordering
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt, liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
Added syncCompletionWaiter, applyAckResult(), failBufferedAcks(), endSync() changes to apply buffered ACKs and clear ACK-tracking; test helper exposes sync waiter for tests.
Operation Source & Signatures
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt, liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
Introduced ObjectsOperationSource (LOCAL, CHANNEL); applyObject/applyObjectOperation now accept a source and return Boolean indicating success/applicability.
LiveMap & LiveCounter managers
liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/..., liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/...
Replaced direct publish with publishAndApply in create/set/inc/remove flows; applyOperation implementations now return Boolean, perform per-branch update/notification, and return false for unsupported ops.
Error Codes
liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
Added PublishAndApplyFailedDueToChannelState(92008) for failures when channel state prevents publish-and-apply.
Tests
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/...
Updated many tests to pass ObjectsOperationSource, adapted expectations to Boolean returns, added tests for LOCAL vs CHANNEL semantics, ACK deduplication, buffering, and publish-and-apply flows; added test scaffolding and teardown handling.
sequenceDiagram
    participant Client
    participant DefaultRealtimeObjects
    participant Channel
    participant Server
    participant ObjectsManager

    Client->>DefaultRealtimeObjects: publishAndApply(objectMessages)
    DefaultRealtimeObjects->>Channel: publish(message)
    Channel->>Server: send
    Server-->>DefaultRealtimeObjects: ACK (PublishResult with serial, siteCode)
    DefaultRealtimeObjects->>DefaultRealtimeObjects: validate siteCode & serial
    DefaultRealtimeObjects->>ObjectsManager: synthesize LOCAL ObjectMessage(s)
    ObjectsManager->>ObjectsManager: apply messages (LOCAL), dedupe via appliedOnAckSerials
    ObjectsManager-->>Client: complete publish/apply result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐇 I found a siteCode shining on the wire,

ACKs hopped back and set my state afire,
Serials remembered, duplicates kept thin,
LOCAL echoes stitched the world within,
A small rabbit cheers — objects all sync in.

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.11% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: introducing ACK-based local application of LiveObjects operations, which is the primary objective reflected throughout the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch AIT-276/apply-on-ack

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions bot temporarily deployed to staging/pull/1194/features February 26, 2026 11:56 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1194/javadoc February 26, 2026 11:58 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt (1)

102-105: Consider asserting the returned apply status as part of these scenarios.
Since applyObject() now returns Boolean, asserting it here would improve contract-level coverage.

Suggested test tightening
-    liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+    val applied = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+    assertEquals(false, applied)

...
-    liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+    val applied = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+    assertEquals(true, applied)

Also applies to: 132-135

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt`
around lines 102 - 105, The test should assert the Boolean return value from
liveMap.applyObject(...) to cover the contract; update DefaultLiveMapTest to
capture the result of applyObject(message, ObjectsOperationSource.CHANNEL) (and
the other applyObject call around lines 132-135) and add assertions like
assertFalse or assertTrue as appropriate based on the scenario before verifying
siteTimeserials, so the test checks both the returned apply status and the
side-effect on liveMap.siteTimeserials.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt (1)

92-95: Consider asserting applyObject() boolean result in these tests.
These cases already validate side effects; also asserting the returned Boolean would protect the new apply contract from regressions.

Suggested test tightening
-    liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+    val applied = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+    assertEquals(false, applied)

...
-    liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+    val applied = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+    assertEquals(true, applied)

Also applies to: 119-122

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt`
around lines 92 - 95, The tests call
DefaultLiveCounterTest::liveCounter.applyObject(message,
ObjectsOperationSource.CHANNEL) but don’t assert its boolean return; capture the
result and assert it matches the expected outcome (e.g., assertFalse(result)
when the operation is skipped and the siteTimeserials remains "serial2",
assertTrue(result) when the operation should be applied). Update both
occurrences (the call around the siteTimeserials "serial2" check and the similar
block at lines 119-122) to assign the return value from
liveCounter.applyObject(...) to a variable and assert the Boolean equals the
expected applied/skipped state.
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt (2)

319-330: Address swallowed exception warning from static analysis.

The try-catch on line 323 catches and swallows exceptions when accessing adapter.getChannel(channelName).reason. While the exception is used as an optional cause for the created AblyException, if an unexpected exception occurs (not just the channel not being available), it's silently ignored.

Consider being more specific about what exceptions to catch, or log the swallowed exception:

🔧 Proposed fix to log swallowed exception
-          val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
+          val errorReason = try {
+            adapter.getChannel(channelName).reason
+          } catch (e: Exception) {
+            Log.d(tag, "Could not retrieve channel reason during state change handling", e)
+            null
+          }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`
around lines 319 - 330, The try-catch around
adapter.getChannel(channelName).reason swallows any Exception, hiding unexpected
errors; update the code in DefaultRealtimeObjects (around adapter.getChannel,
channelName, errorReason, ablyException, objectsManager.failBufferedAcks) to
avoid silent swallowing by either catching only the expected exception type(s)
thrown when a channel is absent or by logging the caught exception before
returning null; ensure the catch block records the exception (e.g., logger.warn
or logger.error with the exception) so unexpected failures are visible while
still allowing errorReason to be null for building the AblyException.

212-221: Early returns silently degrade to echo-based application.

When siteCode is null or serials is unavailable/wrong length, the function logs an error and returns early, causing operations to be applied only when echoed from the server. This is a reasonable fallback, but consider whether callers should be aware of this degraded behavior.

If this degradation is intentional and acceptable (operations still eventually apply), the current approach is fine. If callers need to know about the failure to apply locally, consider returning a result indicating the outcome.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`
around lines 212 - 221, The current early-return behavior when
adapter.connectionManager.siteCode is null or publishResult.serials is
null/length-mismatch (logged via Log.e with tag) silently degrades to echo-based
application; change the function to surface this outcome to callers instead of
only logging and returning—either by returning a status/result (e.g., boolean or
enum) indicating "appliedLocally", "degradedToEcho", or an error, or by invoking
the existing callback/throwing a descriptive exception; update all references to
siteCode, publishResult.serials, and objectMessages handling so callers can
react to the degraded path rather than being unaware.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 319-330: The try-catch around
adapter.getChannel(channelName).reason swallows any Exception, hiding unexpected
errors; update the code in DefaultRealtimeObjects (around adapter.getChannel,
channelName, errorReason, ablyException, objectsManager.failBufferedAcks) to
avoid silent swallowing by either catching only the expected exception type(s)
thrown when a channel is absent or by logging the caught exception before
returning null; ensure the catch block records the exception (e.g., logger.warn
or logger.error with the exception) so unexpected failures are visible while
still allowing errorReason to be null for building the AblyException.
- Around line 212-221: The current early-return behavior when
adapter.connectionManager.siteCode is null or publishResult.serials is
null/length-mismatch (logged via Log.e with tag) silently degrades to echo-based
application; change the function to surface this outcome to callers instead of
only logging and returning—either by returning a status/result (e.g., boolean or
enum) indicating "appliedLocally", "degradedToEcho", or an error, or by invoking
the existing callback/throwing a descriptive exception; update all references to
siteCode, publishResult.serials, and objectMessages handling so callers can
react to the degraded path rather than being unaware.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt`:
- Around line 92-95: The tests call
DefaultLiveCounterTest::liveCounter.applyObject(message,
ObjectsOperationSource.CHANNEL) but don’t assert its boolean return; capture the
result and assert it matches the expected outcome (e.g., assertFalse(result)
when the operation is skipped and the siteTimeserials remains "serial2",
assertTrue(result) when the operation should be applied). Update both
occurrences (the call around the siteTimeserials "serial2" check and the similar
block at lines 119-122) to assign the return value from
liveCounter.applyObject(...) to a variable and assert the Boolean equals the
expected applied/skipped state.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt`:
- Around line 102-105: The test should assert the Boolean return value from
liveMap.applyObject(...) to cover the contract; update DefaultLiveMapTest to
capture the result of applyObject(message, ObjectsOperationSource.CHANNEL) (and
the other applyObject call around lines 132-135) and add assertions like
assertFalse or assertTrue as appropriate based on the scenario before verifying
siteTimeserials, so the test checks both the returned apply status and the
side-effect on liveMap.siteTimeserials.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3298bcc and fa4d3f9.

📒 Files selected for processing (16)
  • lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
  • lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt

@ttypic ttypic requested a review from sacOO7 February 27, 2026 11:59
@github-actions github-actions bot temporarily deployed to staging/pull/1194/features March 2, 2026 12:09 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1194/javadoc March 2, 2026 12:10 Inactive
ttypic added 2 commits March 3, 2026 09:24
…operations

- Implemented synthetic ACK-based object message application logic (`publishAndApply`).
- Added buffering and order-preserving application of ACK results during sync (`applyAckResult`).
- Introduced `ObjectsOperationSource` enum to distinguish operation sources (ACK vs. channel).
- Updated `applyObject` and related object-specific management functions to utilize the source enum.
- Enhanced tests for ACK-based application and updated handling of unsupported operations (returns `false` instead of throwing).
- Improved handling of skipped operations on ACK (RTO9a3) with detailed comments on echo discard logic.
- Added extensive unit tests for `ObjectsManager`, `LiveMapManager`, and `DefaultLiveCounter` covering edge cases like buffering, tombstoning, and operation deduplication.
- Introduced helper properties for buffered ACKs and object operations in tests.
@ttypic ttypic force-pushed the AIT-276/apply-on-ack branch from a53b95d to f57632f Compare March 3, 2026 09:24
@github-actions github-actions bot temporarily deployed to staging/pull/1194/features March 3, 2026 09:25 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1194/javadoc March 3, 2026 09:27 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt (1)

165-176: Assert the expected ObjectsOperationSource explicitly in these verifications.

Using any() here can miss regressions where ObjectsManager forwards the wrong source.

💡 Proposed test tightening
     verify(exactly = 1) {
-      testObject1.applyObject(objectMessage1, any())
+      testObject1.applyObject(objectMessage1, ObjectsOperationSource.CHANNEL)
     }
@@
     verify(exactly = 1) {
-      testObject2.applyObject(objectMessage2, any())
+      testObject2.applyObject(objectMessage2, ObjectsOperationSource.CHANNEL)
     }
@@
     verify(exactly = 1) {
-      testObject3.applyObject(objectMessage3, any())
+      testObject3.applyObject(objectMessage3, ObjectsOperationSource.CHANNEL)
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`
around lines 165 - 176, The verifications for applyObject use any() for the
source which can hide regressions; update each verify call on
testObject1.applyObject, testObject2.applyObject, and testObject3.applyObject to
assert the specific ObjectsOperationSource expected (e.g.,
OBJECTS_OPERATION_SOURCE_SYNC or the concrete enum/value used in your code)
instead of any(), so the test explicitly checks that ObjectsManager forwards the
correct source when syncing objects from the pool.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 213-221: The code in DefaultRealtimeObjects.kt currently silently
returns when siteCode or publishResult.serials are null (the block checking
siteCode and serials), which causes downstream createMapAsync/createCounterAsync
to fail later; instead, change the behavior to fail fast or wait for echo: when
siteCode == null or serials == null/size mismatch, throw a clear exception (e.g.
IllegalStateException) or return a failed Future/CompletableDeferred so the
caller (createMapAsync/createCounterAsync) receives an explicit error; include
context (siteCode/publishResult info and objectMessages size) in the error
message so callers can handle retry/blocking for echo rather than proceeding
silently.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 231-236: The ACK-echo dedup block that checks objectMessage.serial
against realtimeObjects.appliedOnAckSerials must only run for channel echoes;
update the conditional to also require the message source be a channel (e.g.
objectMessage.source == MessageSource.CHANNEL or the appropriate enum/constant
used in this codebase) before logging, removing the serial, and continuing, so
LOCAL replays do not clear dedup markers; keep the existing serial check and
remove/continue behavior but nest or extend the if to include the source check
(referencing objectMessage.serial, objectMessage.source, and
realtimeObjects.appliedOnAckSerials).

---

Nitpick comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 165-176: The verifications for applyObject use any() for the
source which can hide regressions; update each verify call on
testObject1.applyObject, testObject2.applyObject, and testObject3.applyObject to
assert the specific ObjectsOperationSource expected (e.g.,
OBJECTS_OPERATION_SOURCE_SYNC or the concrete enum/value used in your code)
instead of any(), so the test explicitly checks that ObjectsManager forwards the
correct source when syncing objects from the pool.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Jira integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fa4d3f9 and f57632f.

📒 Files selected for processing (17)
  • lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
  • lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (4)
  • lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
  • lib/src/main/java/io/ably/lib/types/ConnectionDetails.java

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces ACK-based local application for LiveObjects operations so local state updates happen on publish ACKs (with correct deduplication against subsequent channel echoes) and preserves ordering across sync sequences.

Changes:

  • Added publishAndApply flow that synthesizes inbound object messages on publish ACK and applies them locally.
  • Buffered ACK results during sync and applied them before buffered channel operations to ensure correct ordering/deduplication.
  • Introduced ObjectsOperationSource and updated object-application pipeline to respect LOCAL vs CHANNEL semantics; expanded tests accordingly.

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt applyOperation now returns Boolean and treats unsupported ops as non-fatal (false).
liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt Uses publishAndApply; propagates Boolean from operation application.
liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt Mirrors LiveMap changes for counters; unsupported ops return false.
liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt Uses publishAndApply; propagates Boolean from operation application.
liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt Adds ObjectsOperationSource parameter and returns Boolean to support LOCAL-vs-CHANNEL behavior.
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt New enum distinguishing LOCAL (ACK) vs CHANNEL sources.
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt Buffers ACK results during sync, applies ACKs before buffered channel ops, and performs echo dedup via appliedOnAckSerials.
liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt Adds PublishAndApplyFailedDueToChannelState error code.
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt Implements publishAndApply, tracks appliedOnAckSerials, and fails buffered ACKs on channel state changes.
lib/src/main/java/io/ably/lib/types/ConnectionDetails.java Adds siteCode to connection details (decoded from msgpack).
lib/src/main/java/io/ably/lib/transport/ConnectionManager.java Stores siteCode from ConnectionDetails for use by LiveObjects.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt Updates tests for boolean return semantics and unsupported ops returning false.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt Adds/updates tests for LOCAL vs CHANNEL application and return values.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt Updates tests for boolean return semantics and unsupported ops returning false.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt Adds/updates tests for LOCAL vs CHANNEL application and return values.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt Adds comprehensive ACK buffering/dedup/order tests and teardown disposal for created instances.
liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt Exposes bufferedAcks for tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +139 to 142
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}

Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createMapAsync now assumes publishAndApply always applies the MAP_CREATE locally and throws if the object isn't in the pool. However publishAndApply explicitly returns early when siteCode or PublishResult.serials is unavailable (it logs and defers application until the channel echo), which will make createMapAsync throw even though the operation may still succeed later. Consider falling back to the previous behavior (create/return a zero-value object in the sequential scope, or otherwise ensure an object is returned when local apply isn't possible) instead of throwing here.

Suggested change
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
val liveMap = objectsPool.get(objectId) as? LiveMap
if (liveMap != null) {
return liveMap
}
// In some cases (e.g., missing siteCode/serials) the apply may be deferred
// until the channel echo is processed. Wait briefly for the object to appear
// in the pool before giving up.
val awaitedLiveMap = awaitLiveMapFromPool(objectId)
return awaitedLiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
/**
* Wait for a LiveMap with the given objectId to appear in the objects pool.
* Returns the LiveMap if it appears within [timeoutMillis], or null otherwise.
*/
private suspend fun awaitLiveMapFromPool(
objectId: String,
timeoutMillis: Long = 5000L,
): LiveMap? {
return try {
withTimeout(timeoutMillis) {
while (true) {
val map = objectsPool.get(objectId) as? LiveMap
if (map != null) {
return@withTimeout map
}
delay(10L)
}
}
} catch (e: TimeoutCancellationException) {
null
}
}

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai can you check above suggestion thoroughly and let me know about details

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacOO7, here's a thorough breakdown:


Is the Copilot concern valid?

Yes. The core concern is real. publishAndApply has two explicit early-return paths that skip local application:

// RTO20c1 — siteCode missing (older server / not yet set)
if (siteCode == null) {
    Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
    return
}
// RTO20c2 — serials missing or wrong count
if (serials == null || serials.size != objectMessages.size) {
    Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
    return
}

In both cases publishAndApply returns without putting anything into objectsPool. The message is published and will eventually be echoed back on the channel, but createMapAsync immediately follows with:

return objectsPool.get(objectId) as? LiveMap
    ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")

This will always throw in the fallback cases, even though the operation was published successfully and will eventually be applied. This is a regression — the old code handled this gracefully by creating a zero-value object.


Issues with the Copilot suggested fix

The polling approach has several problems:

Issue Detail
Busy-wait delay(10L) in a while(true) loop polls the pool every 10 ms. A reactive approach (e.g., a CompletableDeferred<LiveMap> or a StateFlow on the pool) would be far more efficient.
Arbitrary timeout The hardcoded 5000L ms is a guess — no spec basis. Under slow networks or high load it may still expire before the echo arrives.
Worst-case UX In the fallback scenario the caller already waited for publish ACK; now they wait another 5 s and then get an error anyway.
No observable improvement over previous behaviour The old code immediately returned a zero-value object that got updated on echo. The suggestion delays and then throws — strictly worse.
Still can't distinguish paths publishAndApply returns Unit; the suggestion doesn't fix the root cause (caller can't know whether local apply happened).

Recommended approach

The cleanest fix is to make publishAndApply return a Boolean indicating whether local application happened, and fall back to the previous behaviour in createMapAsync/createCounterAsync when it returns false:

// In publishAndApply — return Boolean
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>): Boolean {
    val publishResult = publish(objectMessages)

    val siteCode = adapter.connectionManager.siteCode
    if (siteCode == null) {
        Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
        return false // <-- caller can fall back
    }
    val serials = publishResult.serials
    if (serials == null || serials.size != objectMessages.size) {
        Log.e(tag, "RTO20c2: serials unavailable or wrong length; operations will be applied when echoed")
        return false // <-- caller can fall back
    }
    // ... build synthetic messages and apply ...
    withContext(sequentialScope.coroutineContext) {
        objectsManager.applyAckResult(syntheticMessages)
    }
    return true
}

Then in createMapAsync:

val applied = publishAndApply(arrayOf(msg))

if (applied) {
    // RTO11h2 – fast path: object was applied locally
    return objectsPool.get(objectId) as? LiveMap
        ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")
}

// Fallback (RTO20c1/RTO20c2): local apply deferred to channel echo;
// return a zero-value object that will be updated when the echo arrives
return withContext(sequentialScope.coroutineContext) {
    objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
}

This approach:

  • Preserves the throw for the normal (ACK-applied) path — so the spec contract holds.
  • Falls back gracefully to the previous behaviour when the server doesn't supply the required info.
  • Avoids polling, hardcoded timeouts, and busy-waits entirely.
  • Is self-documenting about which path was taken.

The same pattern applies symmetrically to createCounterAsync.

Comment on lines +174 to +175
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createCounterAsync throws if the counter object isn't present in the pool immediately after publishAndApply, but publishAndApply can legitimately return without applying locally (e.g. missing siteCode / missing PublishResult.serials) and rely on the channel echo instead. This makes counter creation fail in scenarios where the system is intentionally designed to fall back to echo-based application; consider reinstating a fallback that returns/creates a zero-value object when local apply can't happen.

Suggested change
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
(objectsPool.get(objectId) as? LiveCounter)?.let { return it }
// In some configurations, publishAndApply may not apply the change locally and instead
// rely on the channel echo to populate the objectsPool (e.g. missing siteCode /
// missing PublishResult.serials). In that case, wait briefly for the echo-based apply.
val echoAppliedCounter = withTimeoutOrNull(5_000L) {
while (true) {
val counter = objectsPool.get(objectId) as? LiveCounter
if (counter != null) {
return@withTimeoutOrNull counter
}
delay(10L)
}
}
if (echoAppliedCounter != null) {
return echoAppliedCounter
}
throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #1194 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should fix this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't do this based on spec, we throw exception if we wasn't ably to apply locally (it means something went wrong, and sdk user can decide for themselves what to do next)

Comment on lines 31 to 33
@Test
fun `(RTO5) ObjectsManager should handle object sync messages`() {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test class now disposes DefaultRealtimeObjects instances created via makeRealtimeObjects, but other tests in the same file still create instances via getDefaultRealtimeObjectsWithMockedDeps() which also starts background coroutines (incomingObjectsHandler) and are not disposed/tracked. To avoid leaked coroutines between tests, consider routing all instance creation through the tracking helper (or adding the mocked-deps instances to testInstances and disposing them in tearDown).

Suggested change
@Test
fun `(RTO5) ObjectsManager should handle object sync messages`() {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
private fun makeRealtimeObjectsWithMockedDeps(): DefaultRealtimeObjects {
return getDefaultRealtimeObjectsWithMockedDeps().also { testInstances.add(it) }
}
@Test
fun `(RTO5) ObjectsManager should handle object sync messages`() {
val defaultRealtimeObjects = makeRealtimeObjectsWithMockedDeps()

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 17 out of 17 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@sacOO7
Copy link
Collaborator

sacOO7 commented Mar 10, 2026

Few comments

  1. endSync ordering: appliedOnAckSerials.clear() placed before buffered channel messages -- violates spec RTO5c
    ably-js (_endSync): Follows the spec ordering: applies buffered channel messages first, then clears appliedOnAckSerials, then emits synced.
  2. bufferedAcks use can be avoided
    Will share you the full feedback

@github-actions github-actions bot temporarily deployed to staging/pull/1194/features March 10, 2026 21:32 Inactive
@github-actions github-actions bot temporarily deployed to staging/pull/1194/javadoc March 10, 2026 21:34 Inactive
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)

77-85: ⚠️ Potential issue | 🟠 Major

Do not replace an outstanding syncCompletionWaiter.

applyAckResult() suspends on the current deferred. If a new sync starts before that one completes, Line 84 swaps in a different waiter and the suspended ACK coroutine is left waiting on the old one forever. Reuse the existing incomplete waiter, or fail it before replacing it.

🔧 Suggested fix
-    syncCompletionWaiter = CompletableDeferred()
+    if (syncCompletionWaiter == null || syncCompletionWaiter?.isCompleted == true) {
+      syncCompletionWaiter = CompletableDeferred()
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 77 - 85, startNewSync currently overwrites syncCompletionWaiter causing
any coroutine in applyAckResult waiting on the old deferred to hang; instead, in
startNewSync check the existing syncCompletionWaiter: if it's still active reuse
it (do not replace), otherwise create a new CompletableDeferred; alternatively,
if you must replace, completeExceptionally or cancel the old waiter before
assigning a new CompletableDeferred to syncCompletionWaiter so applyAckResult
won't remain suspended. Locate startNewSync and syncCompletionWaiter and
implement one of these approaches to ensure applyAckResult's suspended awaits
are resolved.
♻️ Duplicate comments (2)
liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt (1)

209-219: ⚠️ Potential issue | 🟠 Major

Restrict ACK-echo deduplication to channel messages.

This branch still runs for ObjectsOperationSource.LOCAL. A local replay can remove the serial from appliedOnAckSerials, so the later channel echo is no longer filtered and can be applied twice.

🔧 Suggested fix
-      if (objectMessage.serial != null &&
-        realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
+      if (source == ObjectsOperationSource.CHANNEL &&
+        objectMessage.serial != null &&
+        realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
+      ) {
         Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo")
-        realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
         continue // discard without taking any further action
       }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 209 - 219, The ACK-echo deduplication block in ObjectsManager (checking
realtimeObjects.appliedOnAckSerials against objectMessage.serial) must only run
for channel-originated messages to avoid local replays removing serials and
letting channel echoes apply twice; modify the conditional around that block to
also require objectMessage.source == ObjectsOperationSource.CHANNEL (or
equivalent enum value) so the serial removal and continue are only executed for
channel messages while preserving existing behavior for LOCAL and other sources.
liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt (1)

211-220: ⚠️ Potential issue | 🟠 Major

Fail publishAndApply() explicitly when ACK metadata is missing.

createMapAsync()/createCounterAsync() assume this method has applied the operation locally and immediately read from objectsPool afterward. Returning here turns a successful publish into a later "not applied as expected" failure, and a caller retry can duplicate the create on the wire. Surface the failure here instead of silently falling back.

🔧 Suggested fix
     val siteCode = adapter.connectionManager.siteCode
     if (siteCode == null) {
-      Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
-      return
+      throw serverError("RTO20c1: siteCode unavailable; cannot apply ACK locally")
     }
     val serials = publishResult.serials
     if (serials == null || serials.size != objectMessages.size) {
-      Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
-      return
+      throw serverError(
+        "RTO20c2: PublishResult.serials unavailable or wrong length; " +
+          "expected=${objectMessages.size}, actual=${serials?.size ?: "null"}"
+      )
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`
around lines 211 - 220, The check in publishAndApply (in
DefaultRealtimeObjects.kt) currently logs and returns when
adapter.connectionManager.siteCode or publishResult.serials is missing, which
hides publish failures from createMapAsync/createCounterAsync; instead fail the
publish explicitly by returning/propagating an error (e.g., throw an exception
or complete the returned future/Deferred exceptionally) when siteCode == null or
when publishResult.serials is null or of the wrong length; update the code paths
in publishAndApply so callers like createMapAsync and createCounterAsync see the
failure rather than proceeding to read objectsPool, referencing publishAndApply,
publishResult.serials, adapter.connectionManager.siteCode, and objectsPool to
locate where to change the control flow.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt`:
- Around line 24-29: The test cleanup only disposes instances tracked in
testInstances, but DefaultRealtimeObjects created via
getDefaultRealtimeObjectsWithMockedDeps() are not added and leak background
coroutines; update the test setup so every DefaultRealtimeObjects created is
registered for disposal—either modify getDefaultRealtimeObjectsWithMockedDeps()
to append created DefaultRealtimeObjects to the shared testInstances list or
ensure callers always call makeRealtimeObjects() instead of directly
constructing objects—so that the teardown routine disposes/cleans up all
DefaultRealtimeObjects (references: DefaultRealtimeObjects, makeRealtimeObjects,
getDefaultRealtimeObjectsWithMockedDeps, testInstances).

---

Outside diff comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 77-85: startNewSync currently overwrites syncCompletionWaiter
causing any coroutine in applyAckResult waiting on the old deferred to hang;
instead, in startNewSync check the existing syncCompletionWaiter: if it's still
active reuse it (do not replace), otherwise create a new CompletableDeferred;
alternatively, if you must replace, completeExceptionally or cancel the old
waiter before assigning a new CompletableDeferred to syncCompletionWaiter so
applyAckResult won't remain suspended. Locate startNewSync and
syncCompletionWaiter and implement one of these approaches to ensure
applyAckResult's suspended awaits are resolved.

---

Duplicate comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt`:
- Around line 211-220: The check in publishAndApply (in
DefaultRealtimeObjects.kt) currently logs and returns when
adapter.connectionManager.siteCode or publishResult.serials is missing, which
hides publish failures from createMapAsync/createCounterAsync; instead fail the
publish explicitly by returning/propagating an error (e.g., throw an exception
or complete the returned future/Deferred exceptionally) when siteCode == null or
when publishResult.serials is null or of the wrong length; update the code paths
in publishAndApply so callers like createMapAsync and createCounterAsync see the
failure rather than proceeding to read objectsPool, referencing publishAndApply,
publishResult.serials, adapter.connectionManager.siteCode, and objectsPool to
locate where to change the control flow.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 209-219: The ACK-echo deduplication block in ObjectsManager
(checking realtimeObjects.appliedOnAckSerials against objectMessage.serial) must
only run for channel-originated messages to avoid local replays removing serials
and letting channel echoes apply twice; modify the conditional around that block
to also require objectMessage.source == ObjectsOperationSource.CHANNEL (or
equivalent enum value) so the serial removal and continue are only executed for
channel messages while preserving existing behavior for LOCAL and other sources.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: edba32a5-48a9-4429-a369-7d78f273741a

📥 Commits

Reviewing files that changed from the base of the PR and between f57632f and 708fe84.

📒 Files selected for processing (4)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt`:
- Around line 111-117: The applyAckResult method can hang because it creates a
new syncCompletionWaiter when state != Synced (e.g., Initialized) but that
waiter is never completed; update applyAckResult in ObjectsManager to not create
a new CompletableDeferred unconditionally—either (A) throw an explicit error
when realtimeObjects.state == ObjectsState.Initialized to fail fast, or (B) only
await an existing syncCompletionWaiter (i.e., if (syncCompletionWaiter != null)
await it) instead of assigning a new one; reference applyAckResult,
syncCompletionWaiter, startNewSync and endSync when making the change so the
waiter lifecycle remains consistent.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 518bbd87-2192-43ce-b7ef-0b0146efcec7

📥 Commits

Reviewing files that changed from the base of the PR and between 708fe84 and d732e91.

📒 Files selected for processing (5)
  • liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
  • liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
🚧 Files skipped from review as they are similar to previous changes (1)
  • liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Comment on lines +111 to +117
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
if (realtimeObjects.state != ObjectsState.Synced) {
if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
}
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential hang if applyAckResult called in Initialized state.

If state != Synced and syncCompletionWaiter == null (e.g., Initialized state before any sync), line 113 creates a new CompletableDeferred that will never be completed—startNewSync replaces it with a fresh instance and endSync completes only the waiter it knows about.

Consider either throwing if state is Initialized, or ensuring the waiter created here is eventually completed.

💡 Proposed defensive check
   internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
     if (realtimeObjects.state != ObjectsState.Synced) {
-      if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
+      val waiter = syncCompletionWaiter
+        ?: throw clientError("applyAckResult called before sync started")
-      syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
+      waiter.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
     }
     applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 111 - 117, The applyAckResult method can hang because it creates a new
syncCompletionWaiter when state != Synced (e.g., Initialized) but that waiter is
never completed; update applyAckResult in ObjectsManager to not create a new
CompletableDeferred unconditionally—either (A) throw an explicit error when
realtimeObjects.state == ObjectsState.Initialized to fail fast, or (B) only
await an existing syncCompletionWaiter (i.e., if (syncCompletionWaiter != null)
await it) instead of assigning a new one; reference applyAckResult,
syncCompletionWaiter, startNewSync and endSync when making the change so the
waiter lifecycle remains consistent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

3 participants