-
Notifications
You must be signed in to change notification settings - Fork 43
[AIT-276] feat: introduce ACK-based local application of LiveObjects ops #1194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue | |||||||||||||||||||||||||||||||||||||||||||||
| import io.ably.lib.realtime.ChannelState | ||||||||||||||||||||||||||||||||||||||||||||||
| import io.ably.lib.types.AblyException | ||||||||||||||||||||||||||||||||||||||||||||||
| import io.ably.lib.types.ProtocolMessage | ||||||||||||||||||||||||||||||||||||||||||||||
| import io.ably.lib.types.PublishResult | ||||||||||||||||||||||||||||||||||||||||||||||
| import io.ably.lib.util.Log | ||||||||||||||||||||||||||||||||||||||||||||||
| import kotlinx.coroutines.* | ||||||||||||||||||||||||||||||||||||||||||||||
| import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal | |||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| internal var state = ObjectsState.Initialized | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||
| * Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo. | ||||||||||||||||||||||||||||||||||||||||||||||
| * @spec RTO7b, RTO7b1 | ||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||
| internal val appliedOnAckSerials = mutableSetOf<String>() | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||
| * @spec RTO4 - Used for handling object messages and object sync messages | ||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal | |||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // RTO11g - Publish the message | ||||||||||||||||||||||||||||||||||||||||||||||
| publish(arrayOf(msg)) | ||||||||||||||||||||||||||||||||||||||||||||||
| // RTO11i - publish and apply locally on ACK | ||||||||||||||||||||||||||||||||||||||||||||||
| publishAndApply(arrayOf(msg)) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope | ||||||||||||||||||||||||||||||||||||||||||||||
| return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) { | ||||||||||||||||||||||||||||||||||||||||||||||
| objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| // RTO11h2 - Return existing object if found after apply | ||||||||||||||||||||||||||||||||||||||||||||||
| return objectsPool.get(objectId) as? LiveMap | ||||||||||||||||||||||||||||||||||||||||||||||
| ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| private suspend fun createCounterAsync(initialValue: Number): LiveCounter { | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal | |||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // RTO12g - Publish the message | ||||||||||||||||||||||||||||||||||||||||||||||
| publish(arrayOf(msg)) | ||||||||||||||||||||||||||||||||||||||||||||||
| // RTO12i - publish and apply locally on ACK | ||||||||||||||||||||||||||||||||||||||||||||||
| publishAndApply(arrayOf(msg)) | ||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||
| // RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope | ||||||||||||||||||||||||||||||||||||||||||||||
| return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) { | ||||||||||||||||||||||||||||||||||||||||||||||
| objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter | ||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||
| // RTO12h2 - Return existing object if found after apply | ||||||||||||||||||||||||||||||||||||||||||||||
| return objectsPool.get(objectId) as? LiveCounter | ||||||||||||||||||||||||||||||||||||||||||||||
| ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d | ||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+174
to
+175
|
||||||||||||||||||||||||||||||||||||||||||||||
| return objectsPool.get(objectId) as? LiveCounter | |
| ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d | |
| (objectsPool.get(objectId) as? LiveCounter)?.let { return it } | |
| // In some configurations, publishAndApply may not apply the change locally and instead | |
| // rely on the channel echo to populate the objectsPool (e.g. missing siteCode / | |
| // missing PublishResult.serials). In that case, wait briefly for the echo-based apply. | |
| val echoAppliedCounter = withTimeoutOrNull(5_000L) { | |
| while (true) { | |
| val counter = objectsPool.get(objectId) as? LiveCounter | |
| if (counter != null) { | |
| return@withTimeoutOrNull counter | |
| } | |
| delay(10L) | |
| } | |
| } | |
| if (echoAppliedCounter != null) { | |
| return echoAppliedCounter | |
| } | |
| throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as #1194 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should fix this too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't do this based on spec, we throw exception if we wasn't ably to apply locally (it means something went wrong, and sdk user can decide for themselves what to do next)
ttypic marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject | |
| import io.ably.lib.objects.type.ObjectUpdate | ||
| import io.ably.lib.objects.type.livecounter.DefaultLiveCounter | ||
| import io.ably.lib.objects.type.livemap.DefaultLiveMap | ||
| import io.ably.lib.types.AblyException | ||
| import io.ably.lib.util.Log | ||
| import kotlinx.coroutines.CompletableDeferred | ||
|
|
||
| /** | ||
| * @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences | ||
|
|
@@ -21,6 +23,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| * @spec RTO7 - Buffered object operations during sync | ||
| */ | ||
| private val bufferedObjectOperations = mutableListOf<ObjectMessage>() // RTO7a | ||
| private var syncCompletionWaiter: CompletableDeferred<Unit>? = null | ||
|
|
||
| /** | ||
| * Handles object messages (non-sync messages). | ||
|
|
@@ -39,7 +42,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| } | ||
|
|
||
| // Apply messages immediately if synced | ||
| applyObjectMessages(objectMessages) // RTO8b | ||
| applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -62,7 +65,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| if (syncTracker.hasSyncEnded()) { | ||
| // defer the state change event until the next tick if this was a new sync sequence | ||
| // to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. | ||
| endSync(isNewSync) | ||
| endSync() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -78,25 +81,47 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| bufferedObjectOperations.clear() // RTO5a2b | ||
| syncObjectsDataPool.clear() // RTO5a2a | ||
| currentSyncId = syncId | ||
| stateChange(ObjectsState.Syncing, false) | ||
| syncCompletionWaiter = CompletableDeferred() | ||
| stateChange(ObjectsState.Syncing) | ||
| } | ||
|
|
||
| /** | ||
| * Ends the current sync sequence. | ||
| * | ||
| * @spec RTO5c - Applies sync data and buffered operations | ||
| */ | ||
| internal fun endSync(deferStateEvent: Boolean) { | ||
| internal fun endSync() { | ||
| Log.v(tag, "Ending sync sequence") | ||
| applySync() | ||
| // should apply buffered object operations after we applied the sync. | ||
| // can use regular non-sync object.operation logic | ||
| applyObjectMessages(bufferedObjectOperations) // RTO5c6 | ||
|
|
||
| bufferedObjectOperations.clear() // RTO5c5 | ||
| syncObjectsDataPool.clear() // RTO5c4 | ||
| currentSyncId = null // RTO5c3 | ||
| stateChange(ObjectsState.Synced, deferStateEvent) | ||
| applySync() // RTO5c1/2/7 | ||
| applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6 | ||
| bufferedObjectOperations.clear() // RTO5c5 | ||
| syncObjectsDataPool.clear() // RTO5c4 | ||
| currentSyncId = null // RTO5c3 | ||
| realtimeObjects.appliedOnAckSerials.clear() // RTO5c9 | ||
| stateChange(ObjectsState.Synced) // RTO5c8 | ||
| syncCompletionWaiter?.complete(Unit) | ||
| syncCompletionWaiter = null | ||
| } | ||
|
|
||
| /** | ||
| * Called from publishAndApply (via withContext sequentialScope). | ||
| * If SYNCED: apply immediately with LOCAL source. | ||
| * If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply. | ||
| */ | ||
| internal suspend fun applyAckResult(messages: List<ObjectMessage>) { | ||
| if (realtimeObjects.state != ObjectsState.Synced) { | ||
| if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred() | ||
| syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1) | ||
| } | ||
| applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f | ||
| } | ||
|
Comment on lines
+111
to
+117
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential hang if If Consider either throwing if state is 💡 Proposed defensive check internal suspend fun applyAckResult(messages: List<ObjectMessage>) {
if (realtimeObjects.state != ObjectsState.Synced) {
- if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
+ val waiter = syncCompletionWaiter
+ ?: throw clientError("applyAckResult called before sync started")
- syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
+ waiter.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
}
applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
}🤖 Prompt for AI Agents |
||
|
|
||
| /** | ||
| * Fails all pending apply waiters. | ||
| * Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1). | ||
| */ | ||
| internal fun failBufferedAcks(error: AblyException) { | ||
| syncCompletionWaiter?.completeExceptionally(error) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -162,7 +187,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| * | ||
| * @spec RTO9 - Creates zero-value objects if they don't exist | ||
| */ | ||
| private fun applyObjectMessages(objectMessages: List<ObjectMessage>) { | ||
| private fun applyObjectMessages( | ||
| objectMessages: List<ObjectMessage>, | ||
| source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL, | ||
| ) { | ||
| // RTO9a | ||
| for (objectMessage in objectMessages) { | ||
| if (objectMessage.operation == null) { | ||
|
|
@@ -177,14 +205,30 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}") | ||
| continue | ||
| } | ||
|
|
||
| // RTO9a3 - skip operations already applied on ACK (discard without taking any further action). | ||
| // This check comes before zero-value object creation (RTO9a2a1) so that no zero-value object is | ||
| // created for an objectId not yet in the pool when the echo is being discarded. | ||
| // Note: siteTimeserials is NOT updated here intentionally — updating it to the echo's serial would | ||
| // incorrectly reject older-but-unprocessed operations from the same site that arrive after the echo. | ||
| if (objectMessage.serial != null && | ||
| realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) { | ||
| Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo") | ||
| realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial) | ||
| continue // discard without taking any further action | ||
| } | ||
ttypic marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, | ||
| // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. | ||
| // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, | ||
| // since they need to be able to eventually initialize themselves from that *_CREATE op. | ||
| // so to simplify operations handling, we always try to create a zero-value object in the pool first, | ||
| // and then we can always apply the operation on the existing object in the pool. | ||
| val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1 | ||
| obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3 | ||
| val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3 | ||
| if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) { | ||
| realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4 | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -228,7 +272,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| * | ||
| * @spec RTO2 - Emits state change events for syncing and synced states | ||
| */ | ||
| private fun stateChange(newState: ObjectsState, deferEvent: Boolean) { | ||
| private fun stateChange(newState: ObjectsState) { | ||
| if (realtimeObjects.state == newState) { | ||
| return | ||
| } | ||
|
|
@@ -240,6 +284,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject | |
| } | ||
|
|
||
| internal fun dispose() { | ||
| syncCompletionWaiter?.cancel() | ||
| syncObjectsDataPool.clear() | ||
| bufferedObjectOperations.clear() | ||
| disposeObjectsStateListeners() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| package io.ably.lib.objects | ||
|
|
||
| /** @spec RTO22 */ | ||
| internal enum class ObjectsOperationSource { | ||
| LOCAL, // RTO22a - applied upon receipt of ACK | ||
| CHANNEL // RTO22b - received over a Realtime channel | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createMapAsyncnow assumespublishAndApplyalways applies the MAP_CREATE locally and throws if the object isn't in the pool. HoweverpublishAndApplyexplicitly returns early whensiteCodeorPublishResult.serialsis unavailable (it logs and defers application until the channel echo), which will makecreateMapAsyncthrow even though the operation may still succeed later. Consider falling back to the previous behavior (create/return a zero-value object in the sequential scope, or otherwise ensure an object is returned when local apply isn't possible) instead of throwing here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai can you check above suggestion thoroughly and let me know about details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sacOO7, here's a thorough breakdown:Is the Copilot concern valid?
Yes. The core concern is real.
publishAndApplyhas two explicit early-return paths that skip local application:In both cases
publishAndApplyreturns without putting anything intoobjectsPool. The message is published and will eventually be echoed back on the channel, butcreateMapAsyncimmediately follows with:This will always throw in the fallback cases, even though the operation was published successfully and will eventually be applied. This is a regression — the old code handled this gracefully by creating a zero-value object.
Issues with the Copilot suggested fix
The polling approach has several problems:
delay(10L)in awhile(true)loop polls the pool every 10 ms. A reactive approach (e.g., aCompletableDeferred<LiveMap>or aStateFlowon the pool) would be far more efficient.5000Lms is a guess — no spec basis. Under slow networks or high load it may still expire before the echo arrives.publishAndApplyreturnsUnit; the suggestion doesn't fix the root cause (caller can't know whether local apply happened).Recommended approach
The cleanest fix is to make
publishAndApplyreturn aBooleanindicating whether local application happened, and fall back to the previous behaviour increateMapAsync/createCounterAsyncwhen it returnsfalse:Then in
createMapAsync:This approach:
The same pattern applies symmetrically to
createCounterAsync.