Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) {
maxIdleInterval = connectionDetails.maxIdleInterval;
connectionStateTtl = connectionDetails.connectionStateTtl;
maxMessageSize = connectionDetails.maxMessageSize;
siteCode = connectionDetails.siteCode; // CD2j

/* set the clientId resolved from token, if any */
String clientId = connectionDetails.clientId;
Expand Down Expand Up @@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) {
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
public int maxMessageSize = Defaults.maxMessageSize;
public String siteCode; // CD2j
long maxIdleInterval = Defaults.maxIdleInterval;
private int disconnectedRetryAttempt = 0;

Expand Down
10 changes: 10 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public class ConnectionDetails {
* Spec: CD2f, RTN14e, DF1a
*/
public Long connectionStateTtl;
/**
* An opaque string identifying the server instance that the client is connected to.
* Used as a key in siteTimeserials maps for LiveObjects operations.
* <p>
* Spec: CD2j
*/
public String siteCode;

ConnectionDetails() {
maxIdleInterval = Defaults.maxIdleInterval;
Expand Down Expand Up @@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionStateTtl":
connectionStateTtl = unpacker.unpackLong();
break;
case "siteCode":
siteCode = unpacker.unpackString();
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
import io.ably.lib.types.PublishResult
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
Expand All @@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal

internal var state = ObjectsState.Initialized

/**
* Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo.
* @spec RTO7b, RTO7b1
*/
internal val appliedOnAckSerials = mutableSetOf<String>()

/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
Expand Down Expand Up @@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)

// RTO11g - Publish the message
publish(arrayOf(msg))
// RTO11i - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) {
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
}
// RTO11h2 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}

Comment on lines +139 to 142
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createMapAsync now assumes publishAndApply always applies the MAP_CREATE locally and throws if the object isn't in the pool. However publishAndApply explicitly returns early when siteCode or PublishResult.serials is unavailable (it logs and defers application until the channel echo), which will make createMapAsync throw even though the operation may still succeed later. Consider falling back to the previous behavior (create/return a zero-value object in the sequential scope, or otherwise ensure an object is returned when local apply isn't possible) instead of throwing here.

Suggested change
return objectsPool.get(objectId) as? LiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
val liveMap = objectsPool.get(objectId) as? LiveMap
if (liveMap != null) {
return liveMap
}
// In some cases (e.g., missing siteCode/serials) the apply may be deferred
// until the channel echo is processed. Wait briefly for the object to appear
// in the pool before giving up.
val awaitedLiveMap = awaitLiveMapFromPool(objectId)
return awaitedLiveMap
?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
/**
* Wait for a LiveMap with the given objectId to appear in the objects pool.
* Returns the LiveMap if it appears within [timeoutMillis], or null otherwise.
*/
private suspend fun awaitLiveMapFromPool(
objectId: String,
timeoutMillis: Long = 5000L,
): LiveMap? {
return try {
withTimeout(timeoutMillis) {
while (true) {
val map = objectsPool.get(objectId) as? LiveMap
if (map != null) {
return@withTimeout map
}
delay(10L)
}
}
} catch (e: TimeoutCancellationException) {
null
}
}

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai can you check above suggestion thoroughly and let me know about details

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sacOO7, here's a thorough breakdown:


Is the Copilot concern valid?

Yes. The core concern is real. publishAndApply has two explicit early-return paths that skip local application:

// RTO20c1 — siteCode missing (older server / not yet set)
if (siteCode == null) {
    Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
    return
}
// RTO20c2 — serials missing or wrong count
if (serials == null || serials.size != objectMessages.size) {
    Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
    return
}

In both cases publishAndApply returns without putting anything into objectsPool. The message is published and will eventually be echoed back on the channel, but createMapAsync immediately follows with:

return objectsPool.get(objectId) as? LiveMap
    ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")

This will always throw in the fallback cases, even though the operation was published successfully and will eventually be applied. This is a regression — the old code handled this gracefully by creating a zero-value object.


Issues with the Copilot suggested fix

The polling approach has several problems:

Issue Detail
Busy-wait delay(10L) in a while(true) loop polls the pool every 10 ms. A reactive approach (e.g., a CompletableDeferred<LiveMap> or a StateFlow on the pool) would be far more efficient.
Arbitrary timeout The hardcoded 5000L ms is a guess — no spec basis. Under slow networks or high load it may still expire before the echo arrives.
Worst-case UX In the fallback scenario the caller already waited for publish ACK; now they wait another 5 s and then get an error anyway.
No observable improvement over previous behaviour The old code immediately returned a zero-value object that got updated on echo. The suggestion delays and then throws — strictly worse.
Still can't distinguish paths publishAndApply returns Unit; the suggestion doesn't fix the root cause (caller can't know whether local apply happened).

Recommended approach

The cleanest fix is to make publishAndApply return a Boolean indicating whether local application happened, and fall back to the previous behaviour in createMapAsync/createCounterAsync when it returns false:

// In publishAndApply — return Boolean
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>): Boolean {
    val publishResult = publish(objectMessages)

    val siteCode = adapter.connectionManager.siteCode
    if (siteCode == null) {
        Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
        return false // <-- caller can fall back
    }
    val serials = publishResult.serials
    if (serials == null || serials.size != objectMessages.size) {
        Log.e(tag, "RTO20c2: serials unavailable or wrong length; operations will be applied when echoed")
        return false // <-- caller can fall back
    }
    // ... build synthetic messages and apply ...
    withContext(sequentialScope.coroutineContext) {
        objectsManager.applyAckResult(syntheticMessages)
    }
    return true
}

Then in createMapAsync:

val applied = publishAndApply(arrayOf(msg))

if (applied) {
    // RTO11h2 – fast path: object was applied locally
    return objectsPool.get(objectId) as? LiveMap
        ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId")
}

// Fallback (RTO20c1/RTO20c2): local apply deferred to channel echo;
// return a zero-value object that will be updated when the echo arrives
return withContext(sequentialScope.coroutineContext) {
    objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
}

This approach:

  • Preserves the throw for the normal (ACK-applied) path — so the spec contract holds.
  • Falls back gracefully to the previous behaviour when the server doesn't supply the required info.
  • Avoids polling, hardcoded timeouts, and busy-waits entirely.
  • Is self-documenting about which path was taken.

The same pattern applies symmetrically to createCounterAsync.

private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
Expand Down Expand Up @@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)

// RTO12g - Publish the message
publish(arrayOf(msg))
// RTO12i - publish and apply locally on ACK
publishAndApply(arrayOf(msg))

// RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) {
objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
}
// RTO12h2 - Return existing object if found after apply
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
Comment on lines +174 to +175
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createCounterAsync throws if the counter object isn't present in the pool immediately after publishAndApply, but publishAndApply can legitimately return without applying locally (e.g. missing siteCode / missing PublishResult.serials) and rely on the channel echo instead. This makes counter creation fail in scenarios where the system is intentionally designed to fall back to echo-based application; consider reinstating a fallback that returns/creates a zero-value object when local apply can't happen.

Suggested change
return objectsPool.get(objectId) as? LiveCounter
?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
(objectsPool.get(objectId) as? LiveCounter)?.let { return it }
// In some configurations, publishAndApply may not apply the change locally and instead
// rely on the channel echo to populate the objectsPool (e.g. missing siteCode /
// missing PublishResult.serials). In that case, wait briefly for the echo-based apply.
val echoAppliedCounter = withTimeoutOrNull(5_000L) {
while (true) {
val counter = objectsPool.get(objectId) as? LiveCounter
if (counter != null) {
return@withTimeoutOrNull counter
}
delay(10L)
}
}
if (echoAppliedCounter != null) {
return echoAppliedCounter
}
throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d

Copilot uses AI. Check for mistakes.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as #1194 (comment)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should fix this too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't do this based on spec, we throw exception if we wasn't ably to apply locally (it means something went wrong, and sdk user can decide for themselves what to do next)

}

/**
Expand All @@ -182,15 +187,55 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
/**
* Spec: RTO15
*/
internal suspend fun publish(objectMessages: Array<ObjectMessage>) {
internal suspend fun publish(objectMessages: Array<ObjectMessage>): PublishResult {
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
adapter.throwIfUnpublishableState(channelName)
adapter.ensureMessageSizeWithinLimit(objectMessages)
// RTO15e - Must construct the ProtocolMessage as per RTO15e1, RTO15e2, RTO15e3
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
protocolMessage.state = objectMessages
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
adapter.sendAsync(protocolMessage)
return adapter.sendAsync(protocolMessage) // RTO15h
}

/**
* Publishes the given object messages and, upon receiving the ACK, immediately applies them
* locally as synthetic inbound messages using the assigned serial and connection's siteCode.
*
* Spec: RTO20
*/
internal suspend fun publishAndApply(objectMessages: Array<ObjectMessage>) {
// RTO20b - publish, propagate failure
val publishResult = publish(objectMessages)

// RTO20c - validate required info
val siteCode = adapter.connectionManager.siteCode
if (siteCode == null) {
Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
return
}
val serials = publishResult.serials
if (serials == null || serials.size != objectMessages.size) {
Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
return
}

// RTO20d - create synthetic inbound ObjectMessages
val syntheticMessages = mutableListOf<ObjectMessage>()
objectMessages.forEachIndexed { i, msg ->
val serial = serials[i]
if (serial == null) {
Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping")
return@forEachIndexed
}
syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3
}
if (syntheticMessages.isEmpty()) return

// RTO20e, RTO20f - dispatch to sequential scope for ordering
withContext(sequentialScope.coroutineContext) {
objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f)
}
}

/**
Expand Down Expand Up @@ -268,11 +313,20 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
objectsManager.clearBufferedObjectOperations() // RTO4b5
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
objectsManager.endSync(fromInitializedState) // RTO4b4
objectsManager.endSync() // RTO4b4
}
}
ChannelState.detached,
ChannelState.suspended,
ChannelState.failed -> {
val errorReason = try { adapter.getChannel(channelName).reason } catch (e: Exception) { null }
val error = ablyException(
"publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
ErrorCode.PublishAndApplyFailedDueToChannelState,
HttpStatusCode.BadRequest,
cause = errorReason?.let { AblyException.fromErrorInfo(it) }
)
objectsManager.failBufferedAcks(error) // RTO20e1
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) {
// Channel mode and state validation error codes
ChannelModeRequired(40_024),
ChannelStateError(90_001),
PublishAndApplyFailedDueToChannelState(92_008),
}

internal enum class HttpStatusCode(public val code: Int) {
Expand Down
77 changes: 61 additions & 16 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject
import io.ably.lib.objects.type.ObjectUpdate
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.types.AblyException
import io.ably.lib.util.Log
import kotlinx.coroutines.CompletableDeferred

/**
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
Expand All @@ -21,6 +23,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO7 - Buffered object operations during sync
*/
private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a
private var syncCompletionWaiter: CompletableDeferred<Unit>? = null

/**
* Handles object messages (non-sync messages).
Expand All @@ -39,7 +42,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

// Apply messages immediately if synced
applyObjectMessages(objectMessages) // RTO8b
applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b
}

/**
Expand All @@ -62,7 +65,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
if (syncTracker.hasSyncEnded()) {
// defer the state change event until the next tick if this was a new sync sequence
// to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
endSync(isNewSync)
endSync()
}
}

Expand All @@ -78,25 +81,47 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
bufferedObjectOperations.clear() // RTO5a2b
syncObjectsDataPool.clear() // RTO5a2a
currentSyncId = syncId
stateChange(ObjectsState.Syncing, false)
syncCompletionWaiter = CompletableDeferred()
stateChange(ObjectsState.Syncing)
}

/**
* Ends the current sync sequence.
*
* @spec RTO5c - Applies sync data and buffered operations
*/
internal fun endSync(deferStateEvent: Boolean) {
internal fun endSync() {
Log.v(tag, "Ending sync sequence")
applySync()
// should apply buffered object operations after we applied the sync.
// can use regular non-sync object.operation logic
applyObjectMessages(bufferedObjectOperations) // RTO5c6

bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
stateChange(ObjectsState.Synced, deferStateEvent)
applySync() // RTO5c1/2/7
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
stateChange(ObjectsState.Synced) // RTO5c8
syncCompletionWaiter?.complete(Unit)
syncCompletionWaiter = null
}

/**
* Called from publishAndApply (via withContext sequentialScope).
* If SYNCED: apply immediately with LOCAL source.
* If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply.
*/
internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
if (realtimeObjects.state != ObjectsState.Synced) {
if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
}
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
}
Comment on lines +111 to +117
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential hang if applyAckResult called in Initialized state.

If state != Synced and syncCompletionWaiter == null (e.g., Initialized state before any sync), line 113 creates a new CompletableDeferred that will never be completed—startNewSync replaces it with a fresh instance and endSync completes only the waiter it knows about.

Consider either throwing if state is Initialized, or ensuring the waiter created here is eventually completed.

💡 Proposed defensive check
   internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
     if (realtimeObjects.state != ObjectsState.Synced) {
-      if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
+      val waiter = syncCompletionWaiter
+        ?: throw clientError("applyAckResult called before sync started")
-      syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
+      waiter.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
     }
     applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt` around
lines 111 - 117, The applyAckResult method can hang because it creates a new
syncCompletionWaiter when state != Synced (e.g., Initialized) but that waiter is
never completed; update applyAckResult in ObjectsManager to not create a new
CompletableDeferred unconditionally—either (A) throw an explicit error when
realtimeObjects.state == ObjectsState.Initialized to fail fast, or (B) only
await an existing syncCompletionWaiter (i.e., if (syncCompletionWaiter != null)
await it) instead of assigning a new one; reference applyAckResult,
syncCompletionWaiter, startNewSync and endSync when making the change so the
waiter lifecycle remains consistent.


/**
* Fails all pending apply waiters.
* Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
*/
internal fun failBufferedAcks(error: AblyException) {
syncCompletionWaiter?.completeExceptionally(error)
}

/**
Expand Down Expand Up @@ -162,7 +187,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO9 - Creates zero-value objects if they don't exist
*/
private fun applyObjectMessages(objectMessages: List<ObjectMessage>) {
private fun applyObjectMessages(
objectMessages: List<ObjectMessage>,
source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL,
) {
// RTO9a
for (objectMessage in objectMessages) {
if (objectMessage.operation == null) {
Expand All @@ -177,14 +205,30 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}")
continue
}

// RTO9a3 - skip operations already applied on ACK (discard without taking any further action).
// This check comes before zero-value object creation (RTO9a2a1) so that no zero-value object is
// created for an objectId not yet in the pool when the echo is being discarded.
// Note: siteTimeserials is NOT updated here intentionally — updating it to the echo's serial would
// incorrectly reject older-but-unprocessed operations from the same site that arrive after the echo.
if (objectMessage.serial != null &&
realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo")
realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
continue // discard without taking any further action
}

// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
// since they need to be able to eventually initialize themselves from that *_CREATE op.
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3
if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) {
realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4
}
}
}

Expand Down Expand Up @@ -228,7 +272,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO2 - Emits state change events for syncing and synced states
*/
private fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
private fun stateChange(newState: ObjectsState) {
if (realtimeObjects.state == newState) {
return
}
Expand All @@ -240,6 +284,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

internal fun dispose() {
syncCompletionWaiter?.cancel()
syncObjectsDataPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.ably.lib.objects

/** @spec RTO22 */
internal enum class ObjectsOperationSource {
LOCAL, // RTO22a - applied upon receipt of ACK
CHANNEL // RTO22b - received over a Realtime channel
}
Loading
Loading