diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
index a1987cd27..3fab010cb 100644
--- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
+++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java
@@ -1318,6 +1318,7 @@ private synchronized void onConnected(ProtocolMessage message) {
maxIdleInterval = connectionDetails.maxIdleInterval;
connectionStateTtl = connectionDetails.connectionStateTtl;
maxMessageSize = connectionDetails.maxMessageSize;
+ siteCode = connectionDetails.siteCode; // CD2j
/* set the clientId resolved from token, if any */
String clientId = connectionDetails.clientId;
@@ -2033,6 +2034,7 @@ private boolean isFatalError(ErrorInfo err) {
private CMConnectivityListener connectivityListener;
private long connectionStateTtl = Defaults.connectionStateTtl;
public int maxMessageSize = Defaults.maxMessageSize;
+ public String siteCode; // CD2j
long maxIdleInterval = Defaults.maxIdleInterval;
private int disconnectedRetryAttempt = 0;
diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
index 0977a2350..587b9241f 100644
--- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
+++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java
@@ -73,6 +73,13 @@ public class ConnectionDetails {
* Spec: CD2f, RTN14e, DF1a
*/
public Long connectionStateTtl;
+ /**
+ * An opaque string identifying the server instance that the client is connected to.
+ * Used as a key in siteTimeserials maps for LiveObjects operations.
+ *
+ * Spec: CD2j
+ */
+ public String siteCode;
ConnectionDetails() {
maxIdleInterval = Defaults.maxIdleInterval;
@@ -114,6 +121,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException {
case "connectionStateTtl":
connectionStateTtl = unpacker.unpackLong();
break;
+ case "siteCode":
+ siteCode = unpacker.unpackString();
+ break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
index 00401c50e..f3de5d8b4 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt
@@ -12,6 +12,7 @@ import io.ably.lib.objects.type.map.LiveMapValue
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
+import io.ably.lib.types.PublishResult
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
@@ -31,6 +32,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
internal var state = ObjectsState.Initialized
+ /**
+ * Set of serials for operations applied locally upon ACK, awaiting deduplication of the server echo.
+ * @spec RTO7b, RTO7b1
+ */
+ internal val appliedOnAckSerials = mutableSetOf()
+
/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
@@ -125,13 +132,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)
- // RTO11g - Publish the message
- publish(arrayOf(msg))
+ // RTO11i - publish and apply locally on ACK
+ publishAndApply(arrayOf(msg))
- // RTO11h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
- return objectsPool.get(objectId) as? LiveMap ?: withContext(sequentialScope.coroutineContext) {
- objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveMap
- }
+ // RTO11h2 - Return existing object if found after apply
+ return objectsPool.get(objectId) as? LiveMap
+ ?: throw serverError("createMap: MAP_CREATE was not applied as expected; objectId=$objectId") // RTO11h3d
}
private suspend fun createCounterAsync(initialValue: Number): LiveCounter {
@@ -161,13 +167,12 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
)
)
- // RTO12g - Publish the message
- publish(arrayOf(msg))
+ // RTO12i - publish and apply locally on ACK
+ publishAndApply(arrayOf(msg))
- // RTO12h - Check if object already exists in pool, otherwise create a zero-value object using the sequential scope
- return objectsPool.get(objectId) as? LiveCounter ?: withContext(sequentialScope.coroutineContext) {
- objectsPool.createZeroValueObjectIfNotExists(objectId) as LiveCounter
- }
+ // RTO12h2 - Return existing object if found after apply
+ return objectsPool.get(objectId) as? LiveCounter
+ ?: throw serverError("createCounter: COUNTER_CREATE was not applied as expected; objectId=$objectId") // RTO12h3d
}
/**
@@ -182,7 +187,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
/**
* Spec: RTO15
*/
- internal suspend fun publish(objectMessages: Array) {
+ internal suspend fun publish(objectMessages: Array): PublishResult {
// RTO15b, RTL6c - Ensure that the channel is in a valid state for publishing
adapter.throwIfUnpublishableState(channelName)
adapter.ensureMessageSizeWithinLimit(objectMessages)
@@ -190,7 +195,47 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
val protocolMessage = ProtocolMessage(ProtocolMessage.Action.`object`, channelName)
protocolMessage.state = objectMessages
// RTO15f, RTO15g - Send the ProtocolMessage using the adapter and capture success/failure
- adapter.sendAsync(protocolMessage)
+ return adapter.sendAsync(protocolMessage) // RTO15h
+ }
+
+ /**
+ * Publishes the given object messages and, upon receiving the ACK, immediately applies them
+ * locally as synthetic inbound messages using the assigned serial and connection's siteCode.
+ *
+ * Spec: RTO20
+ */
+ internal suspend fun publishAndApply(objectMessages: Array) {
+ // RTO20b - publish, propagate failure
+ val publishResult = publish(objectMessages)
+
+ // RTO20c - validate required info
+ val siteCode = adapter.connectionManager.siteCode
+ if (siteCode == null) {
+ Log.e(tag, "RTO20c1: siteCode not available; operations will be applied when echoed")
+ return
+ }
+ val serials = publishResult.serials
+ if (serials == null || serials.size != objectMessages.size) {
+ Log.e(tag, "RTO20c2: PublishResult.serials unavailable or wrong length; operations will be applied when echoed")
+ return
+ }
+
+ // RTO20d - create synthetic inbound ObjectMessages
+ val syntheticMessages = mutableListOf()
+ objectMessages.forEachIndexed { i, msg ->
+ val serial = serials[i]
+ if (serial == null) {
+ Log.d(tag, "RTO20d1: serial null at index $i (conflated), skipping")
+ return@forEachIndexed
+ }
+ syntheticMessages.add(msg.copy(serial = serial, siteCode = siteCode)) // RTO20d2a, RTO20d2b, RTO20d3
+ }
+ if (syntheticMessages.isEmpty()) return
+
+ // RTO20e, RTO20f - dispatch to sequential scope for ordering
+ withContext(sequentialScope.coroutineContext) {
+ objectsManager.applyAckResult(syntheticMessages) // suspends if SYNCING (RTO20e), applies on SYNCED (RTO20f)
+ }
}
/**
@@ -268,16 +313,30 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
objectsManager.clearBufferedObjectOperations() // RTO4b5
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
- objectsManager.endSync(fromInitializedState) // RTO4b4
+ objectsManager.endSync() // RTO4b4
}
}
ChannelState.detached,
+ ChannelState.suspended,
ChannelState.failed -> {
- // do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
- objectsPool.clearObjectsData(false)
- objectsManager.clearSyncObjectsDataPool()
+ val errorReason = try {
+ adapter.getChannel(channelName).reason
+ } catch (e: Exception) {
+ null
+ }
+ val error = ablyException(
+ "publishAndApply could not be applied locally: channel entered $state whilst waiting for objects sync",
+ ErrorCode.PublishAndApplyFailedDueToChannelState,
+ HttpStatusCode.BadRequest,
+ cause = errorReason?.let { AblyException.fromErrorInfo(it) }
+ )
+ objectsManager.failBufferedAcks(error) // RTO20e1
+ if (state != ChannelState.suspended) {
+ // do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
+ objectsPool.clearObjectsData(false)
+ objectsManager.clearSyncObjectsDataPool()
+ }
}
-
else -> {
// No action needed for other states
}
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
index 17612b043..1a8d1b8ad 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ErrorCodes.kt
@@ -11,6 +11,7 @@ internal enum class ErrorCode(public val code: Int) {
// Channel mode and state validation error codes
ChannelModeRequired(40_024),
ChannelStateError(90_001),
+ PublishAndApplyFailedDueToChannelState(92_008),
}
internal enum class HttpStatusCode(public val code: Int) {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
index 2132c84e9..bb85f1681 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
@@ -4,7 +4,9 @@ import io.ably.lib.objects.type.BaseRealtimeObject
import io.ably.lib.objects.type.ObjectUpdate
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
+import io.ably.lib.types.AblyException
import io.ably.lib.util.Log
+import kotlinx.coroutines.CompletableDeferred
/**
* @spec RTO5 - Processes OBJECT and OBJECT_SYNC messages during sync sequences
@@ -21,6 +23,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO7 - Buffered object operations during sync
*/
private val bufferedObjectOperations = mutableListOf() // RTO7a
+ private var syncCompletionWaiter: CompletableDeferred? = null
/**
* Handles object messages (non-sync messages).
@@ -39,7 +42,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}
// Apply messages immediately if synced
- applyObjectMessages(objectMessages) // RTO8b
+ applyObjectMessages(objectMessages, ObjectsOperationSource.CHANNEL) // RTO8b
}
/**
@@ -62,7 +65,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
if (syncTracker.hasSyncEnded()) {
// defer the state change event until the next tick if this was a new sync sequence
// to allow any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
- endSync(isNewSync)
+ endSync()
}
}
@@ -78,7 +81,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
bufferedObjectOperations.clear() // RTO5a2b
syncObjectsDataPool.clear() // RTO5a2a
currentSyncId = syncId
- stateChange(ObjectsState.Syncing, false)
+ syncCompletionWaiter = CompletableDeferred()
+ stateChange(ObjectsState.Syncing)
}
/**
@@ -86,17 +90,39 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO5c - Applies sync data and buffered operations
*/
- internal fun endSync(deferStateEvent: Boolean) {
+ internal fun endSync() {
Log.v(tag, "Ending sync sequence")
- applySync()
- // should apply buffered object operations after we applied the sync.
- // can use regular non-sync object.operation logic
- applyObjectMessages(bufferedObjectOperations) // RTO5c6
-
- bufferedObjectOperations.clear() // RTO5c5
- syncObjectsDataPool.clear() // RTO5c4
- currentSyncId = null // RTO5c3
- stateChange(ObjectsState.Synced, deferStateEvent)
+ applySync() // RTO5c1/2/7
+ applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
+ bufferedObjectOperations.clear() // RTO5c5
+ syncObjectsDataPool.clear() // RTO5c4
+ currentSyncId = null // RTO5c3
+ realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
+ stateChange(ObjectsState.Synced) // RTO5c8
+ syncCompletionWaiter?.complete(Unit)
+ syncCompletionWaiter = null
+ }
+
+ /**
+ * Called from publishAndApply (via withContext sequentialScope).
+ * If SYNCED: apply immediately with LOCAL source.
+ * If not SYNCED: suspend until endSync transitions to SYNCED (RTO20e), then apply.
+ */
+ internal suspend fun applyAckResult(messages: List) {
+ if (realtimeObjects.state != ObjectsState.Synced) {
+ if (syncCompletionWaiter == null) syncCompletionWaiter = CompletableDeferred()
+ syncCompletionWaiter?.await() // suspends; resumes after endSync transitions to SYNCED (RTO20e1)
+ }
+ applyObjectMessages(messages, ObjectsOperationSource.LOCAL) // RTO20f
+ }
+
+ /**
+ * Fails all pending apply waiters.
+ * Called when the channel enters DETACHED/SUSPENDED/FAILED (RTO20e1).
+ */
+ internal fun failBufferedAcks(error: AblyException) {
+ syncCompletionWaiter?.completeExceptionally(error)
+ syncCompletionWaiter = null
}
/**
@@ -162,7 +188,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO9 - Creates zero-value objects if they don't exist
*/
- private fun applyObjectMessages(objectMessages: List) {
+ private fun applyObjectMessages(
+ objectMessages: List,
+ source: ObjectsOperationSource = ObjectsOperationSource.CHANNEL,
+ ) {
// RTO9a
for (objectMessage in objectMessages) {
if (objectMessage.operation == null) {
@@ -177,6 +206,19 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
Log.w(tag, "Object operation action is unknown, skipping message: ${objectMessage.id}")
continue
}
+
+ // RTO9a3 - skip operations already applied on ACK (discard without taking any further action).
+ // This check comes before zero-value object creation (RTO9a2a1) so that no zero-value object is
+ // created for an objectId not yet in the pool when the echo is being discarded.
+ // Note: siteTimeserials is NOT updated here intentionally — updating it to the echo's serial would
+ // incorrectly reject older-but-unprocessed operations from the same site that arrive after the echo.
+ if (objectMessage.serial != null &&
+ realtimeObjects.appliedOnAckSerials.contains(objectMessage.serial)) {
+ Log.d(tag, "RTO9a3: serial ${objectMessage.serial} already applied on ACK; discarding echo")
+ realtimeObjects.appliedOnAckSerials.remove(objectMessage.serial)
+ continue // discard without taking any further action
+ }
+
// RTO9a2a - we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations,
// we can create a zero-value object for the provided object id and apply the operation to that zero-value object.
// this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves,
@@ -184,7 +226,10 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
// so to simplify operations handling, we always try to create a zero-value object in the pool first,
// and then we can always apply the operation on the existing object in the pool.
val obj = realtimeObjects.objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId) // RTO9a2a1
- obj.applyObject(objectMessage) // RTO9a2a2, RTO9a2a3
+ val applied = obj.applyObject(objectMessage, source) // RTO9a2a2, RTO9a2a3
+ if (source == ObjectsOperationSource.LOCAL && applied && objectMessage.serial != null) {
+ realtimeObjects.appliedOnAckSerials.add(objectMessage.serial) // RTO9a2a4
+ }
}
}
@@ -228,7 +273,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO2 - Emits state change events for syncing and synced states
*/
- private fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
+ private fun stateChange(newState: ObjectsState) {
if (realtimeObjects.state == newState) {
return
}
@@ -240,6 +285,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}
internal fun dispose() {
+ syncCompletionWaiter?.cancel()
syncObjectsDataPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
new file mode 100644
index 000000000..e850d31b8
--- /dev/null
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsOperationSource.kt
@@ -0,0 +1,7 @@
+package io.ably.lib.objects
+
+/** @spec RTO22 */
+internal enum class ObjectsOperationSource {
+ LOCAL, // RTO22a - applied upon receipt of ACK
+ CHANNEL // RTO22b - received over a Realtime channel
+}
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
index fa94e0a59..91bfeb011 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt
@@ -3,6 +3,7 @@ package io.ably.lib.objects.type
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectState
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectsPoolDefaults
import io.ably.lib.objects.objectError
import io.ably.lib.objects.type.livecounter.noOpCounterUpdate
@@ -66,11 +67,11 @@ internal abstract class BaseRealtimeObject(
/**
* This is invoked by ObjectMessage having updated data with parent `ProtocolMessageAction` as `object`
- * @return an update describing the changes
+ * @return true if the operation was meaningfully applied, false otherwise
*
* @spec RTLM15/RTLC7 - Applies ObjectMessage with object data operations to LiveMap/LiveCounter
*/
- internal fun applyObject(objectMessage: ObjectMessage) {
+ internal fun applyObject(objectMessage: ObjectMessage, source: ObjectsOperationSource): Boolean {
validateObjectId(objectMessage.operation?.objectId)
val msgTimeSerial = objectMessage.serial
@@ -84,17 +85,18 @@ internal abstract class BaseRealtimeObject(
"Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " +
"objectId=$objectId"
)
- return
+ return false // RTLC7b / RTLM15b
+ }
+ // RTLC7c / RTLM15c - only update siteTimeserials for CHANNEL source
+ if (source == ObjectsOperationSource.CHANNEL) {
+ siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c
}
- // should update stored site serial immediately. doesn't matter if we successfully apply the op,
- // as it's important to mark that the op was processed by the object
- siteTimeserials[msgSiteCode!!] = msgTimeSerial!! // RTLC7c, RTLM15c
if (isTombstoned) {
// this object is tombstoned so the operation cannot be applied
- return
+ return false // RTLC7e / RTLM15e
}
- applyObjectOperation(objectOperation, objectMessage) // RTLC7d
+ return applyObjectOperation(objectOperation, objectMessage) // RTLC7d
}
/**
@@ -166,9 +168,10 @@ internal abstract class BaseRealtimeObject(
*
* @param operation The operation containing the action and data to apply
* @param message The complete object message containing the operation
+ * @return true if the operation was meaningfully applied, false otherwise
*
*/
- abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage)
+ abstract fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean
/**
* Clears the object's data and returns an update describing the changes.
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
index b34188b62..164cdb28a 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt
@@ -85,16 +85,16 @@ internal class DefaultLiveCounter private constructor(
)
)
- // RTLC12f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLC12g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveCounterUpdate {
return liveCounterManager.applyState(objectState, message.serialTimestamp)
}
- override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
- liveCounterManager.applyOperation(operation, message.serialTimestamp)
+ override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean {
+ return liveCounterManager.applyOperation(operation, message.serialTimestamp)
}
override fun clearData(): LiveCounterUpdate {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
index d96d65b64..943faf4ce 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt
@@ -39,21 +39,32 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter):
/**
* @spec RTLC7 - Applies operations to LiveCounter
*/
- internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?) {
- val update = when (operation.action) {
- ObjectOperationAction.CounterCreate -> applyCounterCreate(operation) // RTLC7d1
+ internal fun applyOperation(operation: ObjectOperation, serialTimestamp: Long?): Boolean {
+ return when (operation.action) {
+ ObjectOperationAction.CounterCreate -> {
+ val update = applyCounterCreate(operation) // RTLC7d1
+ liveCounter.notifyUpdated(update) // RTLC7d1a
+ true // RTLC7d1b
+ }
ObjectOperationAction.CounterInc -> {
if (operation.counterOp != null) {
- applyCounterInc(operation.counterOp) // RTLC7d2
+ val update = applyCounterInc(operation.counterOp) // RTLC7d2
+ liveCounter.notifyUpdated(update) // RTLC7d2a
+ true // RTLC7d2b
} else {
throw objectError("No payload found for ${operation.action} op for LiveCounter objectId=${objectId}")
}
}
- ObjectOperationAction.ObjectDelete -> liveCounter.tombstone(serialTimestamp)
- else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
+ ObjectOperationAction.ObjectDelete -> {
+ val update = liveCounter.tombstone(serialTimestamp)
+ liveCounter.notifyUpdated(update)
+ true // RTLC7d4b
+ }
+ else -> {
+ Log.w(tag, "Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3
+ false
+ }
}
-
- liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a
}
/**
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
index 8c2da8e6a..cd0604dbf 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt
@@ -135,8 +135,8 @@ internal class DefaultLiveMap private constructor(
)
)
- // RTLM20f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLM20g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
private suspend fun removeAsync(keyName: String) {
@@ -157,16 +157,16 @@ internal class DefaultLiveMap private constructor(
)
)
- // RTLM21f - Publish the message
- realtimeObjects.publish(arrayOf(msg))
+ // RTLM21g - publish and apply locally on ACK
+ realtimeObjects.publishAndApply(arrayOf(msg))
}
override fun applyObjectState(objectState: ObjectState, message: ObjectMessage): LiveMapUpdate {
return liveMapManager.applyState(objectState, message.serialTimestamp)
}
- override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage) {
- liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
+ override fun applyObjectOperation(operation: ObjectOperation, message: ObjectMessage): Boolean {
+ return liveMapManager.applyOperation(operation, message.serial, message.serialTimestamp)
}
override fun clearData(): LiveMapUpdate {
diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
index 19a6ef592..90c920cf2 100644
--- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
+++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt
@@ -51,28 +51,41 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
/**
* @spec RTLM15 - Applies operations to LiveMap
*/
- internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?) {
- val update = when (operation.action) {
- ObjectOperationAction.MapCreate -> applyMapCreate(operation) // RTLM15d1
+ internal fun applyOperation(operation: ObjectOperation, serial: String?, serialTimestamp: Long?): Boolean {
+ return when (operation.action) {
+ ObjectOperationAction.MapCreate -> {
+ val update = applyMapCreate(operation) // RTLM15d1
+ liveMap.notifyUpdated(update) // RTLM15d1a
+ true // RTLM15d1b
+ }
ObjectOperationAction.MapSet -> {
if (operation.mapOp != null) {
- applyMapSet(operation.mapOp, serial) // RTLM15d2
+ val update = applyMapSet(operation.mapOp, serial) // RTLM15d2
+ liveMap.notifyUpdated(update) // RTLM15d2a
+ true // RTLM15d2b
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
ObjectOperationAction.MapRemove -> {
if (operation.mapOp != null) {
- applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
+ val update = applyMapRemove(operation.mapOp, serial, serialTimestamp) // RTLM15d3
+ liveMap.notifyUpdated(update) // RTLM15d3a
+ true // RTLM15d3b
} else {
throw objectError("No payload found for ${operation.action} op for LiveMap objectId=${objectId}")
}
}
- ObjectOperationAction.ObjectDelete -> liveMap.tombstone(serialTimestamp)
- else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
+ ObjectOperationAction.ObjectDelete -> {
+ val update = liveMap.tombstone(serialTimestamp)
+ liveMap.notifyUpdated(update)
+ true // RTLM15d5b
+ }
+ else -> {
+ Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
+ false
+ }
}
-
- liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a
}
/**
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
index 94354fcf9..3d10f22a9 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt
@@ -17,6 +17,7 @@ import io.ably.lib.types.ClientOptions
import io.mockk.every
import io.mockk.mockk
import io.mockk.spyk
+import kotlinx.coroutines.CompletableDeferred
internal fun getMockRealtimeChannel(
channelName: String,
@@ -73,6 +74,9 @@ internal val ObjectsManager.SyncObjectsDataPool: Map
internal val ObjectsManager.BufferedObjectOperations: List
get() = this.getPrivateField("bufferedObjectOperations")
+internal val ObjectsManager.SyncCompletionWaiter: CompletableDeferred?
+ get() = this.getPrivateField("syncCompletionWaiter")
+
internal var DefaultRealtimeObjects.ObjectsManager: ObjectsManager
get() = this.getPrivateField("objectsManager")
set(value) = this.setPrivateField("objectsManager", value)
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
index 40565cabe..163a9cf12 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt
@@ -16,14 +16,19 @@ import io.ably.lib.objects.unit.BufferedObjectOperations
import io.ably.lib.objects.unit.ObjectsManager
import io.ably.lib.objects.unit.SyncObjectsDataPool
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
+import io.ably.lib.objects.unit.getMockRealtimeChannel
import io.ably.lib.objects.unit.size
import io.ably.lib.realtime.ChannelState
+import io.ably.lib.types.AblyException
+import io.ably.lib.types.ErrorInfo
import io.ably.lib.types.ProtocolMessage
+import io.mockk.every
import io.mockk.verify
+import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.test.runTest
import org.junit.Test
import kotlin.test.assertEquals
-import io.mockk.every
+import kotlin.test.assertNotNull
class DefaultRealtimeObjectsTest {
@@ -41,7 +46,7 @@ class DefaultRealtimeObjectsTest {
defaultRealtimeObjects.ObjectsManager.startNewSync(null)
}
verify(exactly = 0) {
- defaultRealtimeObjects.ObjectsManager.endSync(any())
+ defaultRealtimeObjects.ObjectsManager.endSync()
}
}
@@ -65,7 +70,7 @@ class DefaultRealtimeObjectsTest {
defaultRealtimeObjects.objectsPool.resetToInitialPool(true)
}
verify(exactly = 1) {
- defaultRealtimeObjects.ObjectsManager.endSync(any())
+ defaultRealtimeObjects.ObjectsManager.endSync()
}
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
@@ -89,7 +94,7 @@ class DefaultRealtimeObjectsTest {
defaultRealtimeObjects.ObjectsManager.startNewSync(null)
}
verify(exactly = 1) {
- defaultRealtimeObjects.ObjectsManager.endSync(true) // deferStateEvent = true
+ defaultRealtimeObjects.ObjectsManager.endSync()
}
}
@@ -155,6 +160,104 @@ class DefaultRealtimeObjectsTest {
}
}
+ @Test
+ fun `(RTO20e1) handleStateChange(DETACHED) fails pending ACK waiters with error 92008`() = runTest {
+ val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
+
+ // Capture the error passed to failBufferedAcks via a CompletableDeferred
+ val capturedError = CompletableDeferred()
+ every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
+ capturedError.complete(firstArg())
+ callOriginal()
+ }
+
+ defaultRealtimeObjects.handleStateChange(ChannelState.detached, false)
+
+ val error = capturedError.await()
+ assertEquals(92008, error.errorInfo.code) // PublishAndApplyFailedDueToChannelState
+ }
+
+ @Test
+ fun `(RTO20e1) handleStateChange(SUSPENDED) fails pending ACK waiters with error 92008`() = runTest {
+ val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
+
+ val capturedError = CompletableDeferred()
+ every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
+ capturedError.complete(firstArg())
+ callOriginal()
+ }
+
+ defaultRealtimeObjects.handleStateChange(ChannelState.suspended, false)
+
+ val error = capturedError.await()
+ assertEquals(92008, error.errorInfo.code) // PublishAndApplyFailedDueToChannelState
+ }
+
+ @Test
+ fun `(RTO20e1) handleStateChange(FAILED) fails pending ACK waiters and propagates channel reason`() = runTest {
+ val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
+
+ // Override the channel returned by the adapter to carry a non-null reason
+ val channelReason = ErrorInfo("channel failed due to auth error", 40100, 401)
+ val channelWithReason = getMockRealtimeChannel("testChannelName")
+ channelWithReason.reason = channelReason
+ every { defaultRealtimeObjects.adapter.getChannel(any()) } returns channelWithReason
+
+ val capturedError = CompletableDeferred()
+ every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
+ capturedError.complete(firstArg())
+ callOriginal()
+ }
+
+ defaultRealtimeObjects.handleStateChange(ChannelState.failed, false)
+
+ val error = capturedError.await()
+ assertEquals(92008, error.errorInfo.code)
+ val causeException = error.cause as? AblyException
+ assertNotNull(causeException, "Error cause must include the channel's reason")
+ assertEquals(channelReason.code, causeException.errorInfo.code)
+ assertEquals(channelReason.message, causeException.errorInfo.message)
+ }
+
+ @Test
+ fun `(RTO4) handleStateChange(SUSPENDED) does NOT clear objects data`() = runTest {
+ val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
+
+ // Use the failBufferedAcks call as a signal that the state-change coroutine has run to completion
+ val failCalled = CompletableDeferred()
+ every { defaultRealtimeObjects.ObjectsManager.failBufferedAcks(any()) } answers {
+ callOriginal()
+ failCalled.complete(Unit)
+ }
+
+ defaultRealtimeObjects.handleStateChange(ChannelState.suspended, false)
+
+ // For SUSPENDED, the coroutine ends immediately after failBufferedAcks (no clear calls)
+ failCalled.await()
+
+ verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
+ verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
+ }
+
+ @Test
+ fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
+ val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
+
+ // Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
+ val syncPoolCleared = CompletableDeferred()
+ every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
+ callOriginal()
+ syncPoolCleared.complete(Unit)
+ }
+
+ defaultRealtimeObjects.handleStateChange(ChannelState.detached, false)
+
+ syncPoolCleared.await()
+
+ verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
+ verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
+ }
+
@Test
fun `(OM2) Populate objectMessage missing id, timestamp and connectionId from protocolMessage`() = runTest {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
index 3e04d9e06..b7541a724 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt
@@ -1,19 +1,33 @@
package io.ably.lib.objects.unit.objects
import io.ably.lib.objects.*
+import io.ably.lib.objects.DefaultRealtimeObjects
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectState
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectsState
import io.ably.lib.objects.type.livecounter.DefaultLiveCounter
import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.objects.unit.*
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
+import io.ably.lib.types.AblyException
+import io.ably.lib.types.ErrorInfo
import io.mockk.*
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runTest
+import kotlinx.coroutines.yield
import org.junit.Test
import kotlin.test.*
class ObjectsManagerTest {
+ // Track instances created in tests to ensure background coroutines are cancelled at teardown
+ private val testInstances = mutableListOf()
+
+ private fun makeRealtimeObjects(channelName: String = "testChannel"): DefaultRealtimeObjects {
+ return DefaultRealtimeObjects(channelName, getMockObjectsAdapter()).also { testInstances.add(it) }
+ }
+
@Test
fun `(RTO5) ObjectsManager should handle object sync messages`() {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
@@ -63,7 +77,7 @@ class ObjectsManagerTest {
objectsManager.startNewSync("sync-123")
}
verify(exactly = 1) {
- objectsManager.endSync(true) // deferStateEvent = true since new sync was started
+ objectsManager.endSync() //
}
val newlyCreatedObjects = mutableListOf()
verify(exactly = 2) {
@@ -148,17 +162,17 @@ class ObjectsManagerTest {
val testObject1 = objectsPool.get("map:testObject@1")
assertNotNull(testObject1, "map:testObject@1 should exist in pool after sync")
verify(exactly = 1) {
- testObject1.applyObject(objectMessage1)
+ testObject1.applyObject(objectMessage1, any())
}
val testObject2 = objectsPool.get("counter:testObject@2")
assertNotNull(testObject2, "counter:testObject@2 should exist in pool after sync")
verify(exactly = 1) {
- testObject2.applyObject(objectMessage2)
+ testObject2.applyObject(objectMessage2, any())
}
val testObject3 = objectsPool.get("map:testObject@3")
assertNotNull(testObject3, "map:testObject@3 should exist in pool after sync")
verify(exactly = 1) {
- testObject3.applyObject(objectMessage3)
+ testObject3.applyObject(objectMessage3, any())
}
}
@@ -193,16 +207,16 @@ class ObjectsManagerTest {
objectsManager.handleObjectMessages(listOf(objectMessage))
verify(exactly = 0) {
- objectsManager["applyObjectMessages"](any>())
+ objectsManager["applyObjectMessages"](any>(), any())
}
assertEquals(1, objectsManager.BufferedObjectOperations.size)
assertEquals(objectMessage, objectsManager.BufferedObjectOperations[0])
assertEquals(1, objectsPool.size(), "Pool should still contain only root object during sync")
// RTO7 - Apply buffered operations after sync
- objectsManager.endSync(false) // End sync without new sync
+ objectsManager.endSync() // End sync without new sync
verify(exactly = 1) {
- objectsManager["applyObjectMessages"](any>())
+ objectsManager["applyObjectMessages"](any>(), any())
}
assertEquals(0, objectsManager.BufferedObjectOperations.size)
assertEquals(2, objectsPool.size(), "Pool should contain 2 objects after applying buffered operations")
@@ -210,6 +224,437 @@ class ObjectsManagerTest {
assertTrue(objectsPool.get("counter:testObject@1") is DefaultLiveCounter, "Should create a DefaultLiveCounter object")
}
+ @Test
+ fun `(RTO23 COUNTER_INC) applyAckResult applies COUNTER_INC locally and tracks serial in appliedOnAckSerials`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-ack-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+ objectsManager.applyAckResult(listOf(msg))
+
+ // Verify operation applied locally (RTO23)
+ assertEquals(5.0, counter.data.get(), "COUNTER_INC should be applied locally on ACK")
+ // Serial added to appliedOnAckSerials (RTO9a2a4)
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-ack-01"),
+ "serial should be in appliedOnAckSerials")
+ // siteTimeserials NOT updated (LOCAL source, RTLC7c)
+ assertFalse(counter.siteTimeserials.containsKey("site1"),
+ "siteTimeserials should NOT be updated for LOCAL source")
+ }
+
+ @Test
+ fun `(RTO23 MAP_SET) applyAckResult applies MAP_SET locally and tracks serial in appliedOnAckSerials`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val liveMap = DefaultLiveMap.zeroValue("map:testMap@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("map:testMap@1", liveMap)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = ObjectData(value = ObjectValue.String("value1")))
+ ),
+ serial = "ser-map-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+ objectsManager.applyAckResult(listOf(msg))
+
+ // Verify entry was set (LOCAL source)
+ assertEquals("value1", liveMap.data["key1"]?.data?.value?.value,
+ "MAP_SET should be applied locally on ACK")
+ // Entry timeserial should be updated (within LiveMapManager, regardless of source)
+ assertEquals("ser-map-01", liveMap.data["key1"]?.timeserial,
+ "entry timeserial should be set by MAP_SET")
+ // Serial added to appliedOnAckSerials
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-map-01"),
+ "serial should be in appliedOnAckSerials")
+ // Object-level siteTimeserials NOT updated (LOCAL source, RTLM15c)
+ assertFalse(liveMap.siteTimeserials.containsKey("site1"),
+ "siteTimeserials should NOT be updated for LOCAL source")
+ }
+
+ @Test
+ fun `(RTO9a3) echo CHANNEL message is deduplicated - serial removed, data NOT re-applied`() {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ counter.data.set(10.0)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ // Simulate: serial already applied locally on ACK
+ defaultRealtimeObjects.appliedOnAckSerials.add("ser-echo-01")
+
+ val echoMsg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-echo-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+ objectsManager.handleObjectMessages(listOf(echoMsg))
+
+ // Data NOT double-applied (RTO9a3)
+ assertEquals(10.0, counter.data.get(), "data should NOT be re-applied on echo dedup")
+ // Serial removed from appliedOnAckSerials (RTO9a3)
+ assertFalse(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-echo-01"),
+ "serial should be removed from appliedOnAckSerials after dedup")
+ // siteTimeserials NOT updated - discarded without further action (RTO9a3)
+ assertNull(counter.siteTimeserials["site1"],
+ "siteTimeserials should NOT be updated by echo dedup (RTO9a3: discard without further action)")
+ }
+
+ @Test
+ fun `(RTO9) non-echo CHANNEL message is applied normally when serial not in appliedOnAckSerials`() {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ counter.data.set(10.0)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 3.0)
+ ),
+ serial = "ser-channel-01",
+ siteCode = "site1"
+ )
+
+ // serial NOT in appliedOnAckSerials — this is a regular (non-echo) CHANNEL message
+ assertFalse(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-channel-01"))
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+ objectsManager.handleObjectMessages(listOf(msg))
+
+ // Should be applied normally (CHANNEL source)
+ assertEquals(13.0, counter.data.get(), "counter should be incremented by CHANNEL message")
+ // siteTimeserials IS updated for CHANNEL source (RTLC7c)
+ assertEquals("ser-channel-01", counter.siteTimeserials["site1"],
+ "siteTimeserials should be updated for CHANNEL source")
+ }
+
+ @Test
+ fun `(RTO22) applyAckResult waits for SYNCED state and applies with LOCAL source after endSync`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Syncing
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-ack-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+
+ // Launch applyAckResult in background — will suspend while SYNCING
+ val ackJob = launch {
+ objectsManager.applyAckResult(listOf(msg))
+ }
+
+ // Allow the coroutine to start and reach deferred.await()
+ yield()
+
+ // During SYNCING — waiter is pending, message NOT yet applied
+ assertNotNull(objectsManager.SyncCompletionWaiter, "sync completion should be pending during SYNCING")
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
+ "appliedOnAckSerials should be empty while waiting")
+ assertEquals(0.0, counter.data.get(), "data should not be applied while SYNCING")
+
+ // End sync — completes waiters (schedules resume), then transitions to SYNCED
+ objectsManager.endSync()
+ ackJob.join()
+
+ // After endSync — message applied with LOCAL source, serial tracked
+ assertEquals(5.0, counter.data.get(), "counter should be incremented after endSync")
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-ack-01"),
+ "serial should be tracked in appliedOnAckSerials after LOCAL apply")
+ assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
+ }
+
+ @Test
+ fun `(RTO5c6) endSync applies buffered CHANNEL messages then unblocks pending ACK waiters`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ counter.data.set(10.0)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+
+ val incMsg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-01",
+ siteCode = "site1"
+ )
+
+ // Start a new sync (state → SYNCING)
+ objectsManager.startNewSync(null)
+ assertEquals(ObjectsState.Syncing, defaultRealtimeObjects.state)
+
+ // Suspend the ACK waiter (SYNCING)
+ val ackJob = launch {
+ objectsManager.applyAckResult(listOf(incMsg))
+ }
+ yield()
+ assertNotNull(objectsManager.SyncCompletionWaiter)
+
+ // Buffer the echo OBJECT message (also buffered since SYNCING)
+ objectsManager.handleObjectMessages(listOf(incMsg))
+ assertEquals(1, objectsManager.BufferedObjectOperations.size)
+
+ // End sync — applies CHANNEL buffered messages first, clears appliedOnAckSerials, then unblocks waiters
+ objectsManager.endSync()
+ ackJob.join()
+
+ // After endSync:
+ // 1. CHANNEL echo applied: counter = 10 + 5 = 15; siteTimeserials["site1"] = "ser-01"
+ // 2. appliedOnAckSerials cleared (was empty since no LOCAL applied during sync)
+ // 3. Waiter resumes → LOCAL apply → canApplyOperation rejects (serial not newer) → applied=false
+ assertEquals(15.0, counter.data.get(), "counter should be incremented exactly once")
+ assertEquals("ser-01", counter.siteTimeserials["site1"],
+ "siteTimeserials should be updated by CHANNEL echo")
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
+ "appliedOnAckSerials should be empty (LOCAL apply was rejected by canApplyOperation)")
+ assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
+ }
+
+ @Test
+ fun `(RTO5c9) endSync applies buffered CHANNEL messages then clears appliedOnAckSerials`() {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ counter.data.set(10.0)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+
+ // Start a sync
+ objectsManager.startNewSync(null)
+ assertEquals(ObjectsState.Syncing, defaultRealtimeObjects.state)
+
+ // Buffer a CHANNEL message during sync
+ val channelMsg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 3.0)
+ ),
+ serial = "ser-channel-01",
+ siteCode = "site1"
+ )
+ objectsManager.handleObjectMessages(listOf(channelMsg))
+ assertEquals(1, objectsManager.BufferedObjectOperations.size)
+
+ // Simulate a serial that was somehow added during sync
+ defaultRealtimeObjects.appliedOnAckSerials.add("ser-during-sync")
+
+ // End sync — CHANNEL messages applied first, then appliedOnAckSerials cleared (RTO5c9)
+ objectsManager.endSync()
+
+ // CHANNEL message was applied (counter incremented)
+ assertEquals(13.0, counter.data.get(),
+ "buffered CHANNEL message should be applied by endSync")
+ // appliedOnAckSerials cleared at sync end (RTO5c9)
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
+ "appliedOnAckSerials should be cleared at sync end (RTO5c9)")
+ assertEquals(ObjectsState.Synced, defaultRealtimeObjects.state)
+ }
+
+ @Test
+ fun `(RTO20e1) failBufferedAcks fails pending deferreds with error code 92008`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Syncing
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+ val error = AblyException.fromErrorInfo(
+ ErrorInfo("channel failed while waiting for sync", 400, 92008)
+ )
+
+ var caughtException: Exception? = null
+ val ackJob = launch {
+ try {
+ objectsManager.applyAckResult(listOf(msg))
+ } catch (e: Exception) {
+ caughtException = e
+ }
+ }
+
+ // Allow the coroutine to start and suspend on deferred.await()
+ yield()
+
+ // Fail the buffered ACK (RTO20e1)
+ objectsManager.failBufferedAcks(error)
+
+ ackJob.join()
+
+ assertNotNull(caughtException, "buffered ACK should fail with an exception")
+ val ablyEx = caughtException as? AblyException
+ assertNotNull(ablyEx, "exception should be an AblyException")
+ assertEquals(92008, ablyEx.errorInfo.code,
+ "error code should be 92008 (PublishAndApplyFailedDueToChannelState)")
+ assertEquals(400, ablyEx.errorInfo.statusCode, "status code should be 400")
+ }
+
+ @Test
+ fun `Echo arrives before ACK - operation applied exactly once via canApplyOperation`() = runTest {
+ val defaultRealtimeObjects = makeRealtimeObjects()
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ counter.data.set(10.0)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "ser-01",
+ siteCode = "site1"
+ )
+
+ val objectsManager = defaultRealtimeObjects.ObjectsManager
+
+ // Step 1: echo arrives first as CHANNEL message — applied normally
+ objectsManager.handleObjectMessages(listOf(msg))
+ assertEquals(15.0, counter.data.get(), "echo should be applied as CHANNEL message")
+ assertEquals("ser-01", counter.siteTimeserials["site1"],
+ "siteTimeserials should be updated by CHANNEL echo")
+
+ // Step 2: ACK fires — applyAckResult with same serial (state is SYNCED, no suspend)
+ objectsManager.applyAckResult(listOf(msg))
+
+ // canApplyOperation rejects (serial "ser-01" is not newer than siteTimeserials["site1"] = "ser-01")
+ assertEquals(15.0, counter.data.get(), "counter should NOT be incremented again by late ACK apply")
+ // applied=false → serial NOT added to appliedOnAckSerials
+ assertFalse(defaultRealtimeObjects.appliedOnAckSerials.contains("ser-01"),
+ "serial should NOT be in appliedOnAckSerials when LOCAL apply was rejected")
+ }
+
+ @Test
+ fun `publishAndApply logs error and returns without apply when siteCode is null`() = runTest {
+ val adapter = getMockObjectsAdapter()
+ // Create a ConnectionManager mock with all fields needed for publish() to succeed
+ val cm = mockk(relaxed = true)
+ cm.maxMessageSize = 65536 // direct field assignment bypasses mock interception issues
+ every { cm.isActive } returns true
+ every { cm.send(any(), any(), any()) } answers {
+ @Suppress("UNCHECKED_CAST")
+ val callback = thirdArg>()
+ callback.onSuccess(io.ably.lib.types.PublishResult(null)) // null serials → RTO20c2 path
+ }
+ every { adapter.connectionManager } returns cm
+ // siteCode is null (relaxed mock default) — triggers RTO20c1 graceful degradation path
+
+ val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", adapter).also { testInstances.add(it) }
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ )
+ )
+
+ // Should not throw even when siteCode is null (RTO20c1 graceful degradation)
+ defaultRealtimeObjects.publishAndApply(arrayOf(msg))
+
+ assertEquals(0.0, counter.data.get(), "no local apply should happen when siteCode is null")
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
+ "appliedOnAckSerials should be empty when siteCode is null")
+ }
+
+ @Test
+ fun `(issue 7b) publishAndApply logs error and returns without apply when serials length mismatches`() = runTest {
+ val adapter = getMockObjectsAdapter()
+ // Create a ConnectionManager mock that returns a PublishResult with wrong-length serials
+ val cm = mockk(relaxed = true)
+ cm.maxMessageSize = 65536 // direct field assignment bypasses mock interception issues
+ every { cm.isActive } returns true
+ cm.siteCode = "site1" // direct field assignment (siteCode is a Java public field)
+ every { cm.send(any(), any(), any()) } answers {
+ @Suppress("UNCHECKED_CAST")
+ val callback = thirdArg>()
+ callback.onSuccess(io.ably.lib.types.PublishResult(arrayOfNulls(0))) // wrong length (0 instead of 1)
+ }
+ every { adapter.connectionManager } returns cm
+
+ val defaultRealtimeObjects = DefaultRealtimeObjects("testChannel", adapter).also { testInstances.add(it) }
+ defaultRealtimeObjects.state = ObjectsState.Synced
+
+ val counter = DefaultLiveCounter.zeroValue("counter:test@1", defaultRealtimeObjects)
+ defaultRealtimeObjects.objectsPool.set("counter:test@1", counter)
+
+ val msg = ObjectMessage(
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:test@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ )
+ )
+
+ // Should not throw even when serials length mismatches (RTO20c2 graceful degradation)
+ defaultRealtimeObjects.publishAndApply(arrayOf(msg))
+
+ assertEquals(0.0, counter.data.get(), "no local apply should happen when serials length mismatches")
+ assertTrue(defaultRealtimeObjects.appliedOnAckSerials.isEmpty(),
+ "appliedOnAckSerials should be empty when serials length mismatches")
+ }
+
private fun mockZeroValuedObjects() {
mockkObject(DefaultLiveMap.Companion)
every {
@@ -227,6 +672,9 @@ class ObjectsManagerTest {
@AfterTest
fun tearDown() {
+ val cleanupError = AblyException.fromErrorInfo(ErrorInfo("test cleanup", 500))
+ testInstances.forEach { it.dispose(cleanupError) }
+ testInstances.clear()
unmockkAll() // Clean up all mockk objects after each test
}
}
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
index 77576a907..9c6bca377 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/DefaultLiveCounterTest.kt
@@ -1,16 +1,20 @@
package io.ably.lib.objects.unit.type.livecounter
import io.ably.lib.objects.ObjectsCounter
+import io.ably.lib.objects.ObjectsCounterOp
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
import io.ably.lib.objects.ObjectOperationAction
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.unit.getDefaultLiveCounterWithMockedDeps
import io.ably.lib.types.AblyException
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
+import kotlin.test.assertFalse
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
class DefaultLiveCounterTest {
@Test
@@ -57,7 +61,7 @@ class DefaultLiveCounterTest {
// RTLC7a - Should throw error when objectId doesn't match
val exception = assertFailsWith {
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
}
val errorInfo = exception.errorInfo
assertNotNull(errorInfo)
@@ -88,7 +92,7 @@ class DefaultLiveCounterTest {
)
// RTLC7b - Should skip operation when serial is not newer
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was not updated (operation was skipped)
assertEquals("serial2", liveCounter.siteTimeserials["site1"])
@@ -115,9 +119,144 @@ class DefaultLiveCounterTest {
)
// RTLC7c - Should update site serial when operation is valid
- liveCounter.applyObject(message)
+ liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was updated
assertEquals("serial2", liveCounter.siteTimeserials["site1"])
}
+
+ @Test
+ fun `(RTLC7c LOCAL) applyObject with LOCAL source updates data but does NOT update siteTimeserials`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+ assertTrue(liveCounter.siteTimeserials.isEmpty(), "siteTimeserials should start empty")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:testCounter@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLC7c - LOCAL source: data IS updated, siteTimeserials is NOT updated
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.LOCAL)
+
+ assertTrue(result, "applyObject should return true for successful COUNTER_INC")
+ assertEquals(5.0, liveCounter.data.get(), "data should be updated for LOCAL source")
+ assertFalse(liveCounter.siteTimeserials.containsKey("site1"),
+ "siteTimeserials should NOT be updated for LOCAL source")
+ }
+
+ @Test
+ fun `(RTLC7b return) applyObject returns false when incoming serial is not newer than existing`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+ liveCounter.siteTimeserials["site1"] = "serial5" // Newer than incoming "serial1"
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:testCounter@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "serial1", // Older than "serial5"
+ siteCode = "site1"
+ )
+
+ // RTLC7b - Should return false when canApplyOperation fails
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertFalse(result, "applyObject should return false when serial is not newer")
+ assertEquals(0.0, liveCounter.data.get(), "data should not be changed")
+ assertEquals("serial5", liveCounter.siteTimeserials["site1"], "siteTimeserials should not change")
+ }
+
+ @Test
+ fun `(RTLC7e return) applyObject returns false when object is tombstoned`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+ liveCounter.tombstone(null) // Tombstone the object
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:testCounter@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLC7e - Should return false when object is tombstoned
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertFalse(result, "applyObject should return false when object is tombstoned")
+ }
+
+ @Test
+ fun `(RTLC7d2b) applyObject returns true for successful COUNTER_INC`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "counter:testCounter@1",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLC7d2b - Should return true for successful COUNTER_INC
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for successful COUNTER_INC")
+ assertEquals(5.0, liveCounter.data.get())
+ }
+
+ @Test
+ fun `(RTLC7d1b) applyObject returns true for successful COUNTER_CREATE`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.CounterCreate,
+ objectId = "counter:testCounter@1",
+ counter = ObjectsCounter(count = 20.0)
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLC7d1b - Should return true for successful COUNTER_CREATE
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for successful COUNTER_CREATE")
+ }
+
+ @Test
+ fun `(RTLC7d4b) applyObject returns true for OBJECT_DELETE (tombstone)`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps("counter:testCounter@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.ObjectDelete,
+ objectId = "counter:testCounter@1",
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLC7d4b - Should return true for OBJECT_DELETE (tombstone applied)
+ val result = liveCounter.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for OBJECT_DELETE")
+ assertTrue(liveCounter.isTombstoned, "object should be tombstoned")
+ }
}
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
index 6c1e49748..813f44dc5 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt
@@ -64,7 +64,55 @@ class DefaultLiveCounterManagerTest {
@Test
- fun `(RTLC7, RTLC7d3) LiveCounterManager should throw error for unsupported action`() {
+ fun `(RTLC7d1b) LiveCounterManager applyOperation returns true for COUNTER_CREATE`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps()
+ val liveCounterManager = liveCounter.LiveCounterManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.CounterCreate,
+ objectId = "testCounterId",
+ counter = ObjectsCounter(count = 10.0)
+ )
+
+ // RTLC7d1b - Should return true for successful COUNTER_CREATE
+ val result = liveCounterManager.applyOperation(operation, null)
+ assertTrue(result, "applyOperation should return true for COUNTER_CREATE")
+ }
+
+ @Test
+ fun `(RTLC7d2b) LiveCounterManager applyOperation returns true for COUNTER_INC`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps()
+ val liveCounterManager = liveCounter.LiveCounterManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.CounterInc,
+ objectId = "testCounterId",
+ counterOp = ObjectsCounterOp(amount = 5.0)
+ )
+
+ // RTLC7d2b - Should return true for successful COUNTER_INC
+ val result = liveCounterManager.applyOperation(operation, null)
+ assertTrue(result, "applyOperation should return true for COUNTER_INC")
+ }
+
+ @Test
+ fun `(RTLC7d4b) LiveCounterManager applyOperation returns true for OBJECT_DELETE`() {
+ val liveCounter = getDefaultLiveCounterWithMockedDeps()
+ val liveCounterManager = liveCounter.LiveCounterManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.ObjectDelete,
+ objectId = "testCounterId",
+ )
+
+ // RTLC7d4b - Should return true for OBJECT_DELETE (tombstone)
+ val result = liveCounterManager.applyOperation(operation, null)
+ assertTrue(result, "applyOperation should return true for OBJECT_DELETE")
+ assertTrue(liveCounter.isTombstoned, "counter should be tombstoned after ObjectDelete")
+ }
+
+ @Test
+ fun `(RTLC7, RTLC7d3) LiveCounterManager should return false for unsupported action`() {
val liveCounter = getDefaultLiveCounterWithMockedDeps()
val liveCounterManager = liveCounter.LiveCounterManager
@@ -74,15 +122,9 @@ class DefaultLiveCounterManagerTest {
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
)
- // RTLC7d3 - Should throw error for unsupported action
- val exception = assertFailsWith {
- liveCounterManager.applyOperation(operation, null)
- }
-
- val errorInfo = exception.errorInfo
- assertNotNull(errorInfo)
- assertEquals(92000, errorInfo.code) // InvalidObject error code
- assertEquals(500, errorInfo.statusCode) // InternalServerError status code
+ // RTLC7d3 - Should return false for unsupported action (no longer throws)
+ val result = liveCounterManager.applyOperation(operation, null)
+ assertFalse(result, "Should return false for unsupported action")
}
@Test
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
index 783cfe928..4746b91ee 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/DefaultLiveMapTest.kt
@@ -2,6 +2,8 @@ package io.ably.lib.objects.unit.type.livemap
import io.ably.lib.objects.ObjectsMapSemantics
import io.ably.lib.objects.ObjectsMap
+import io.ably.lib.objects.ObjectsMapOp
+import io.ably.lib.objects.ObjectsOperationSource
import io.ably.lib.objects.ObjectState
import io.ably.lib.objects.ObjectMessage
import io.ably.lib.objects.ObjectOperation
@@ -11,7 +13,9 @@ import io.ably.lib.types.AblyException
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
+import kotlin.test.assertFalse
import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
class DefaultLiveMapTest {
@Test
@@ -64,7 +68,7 @@ class DefaultLiveMapTest {
// RTLM15a - Should throw error when objectId doesn't match
val exception = assertFailsWith {
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
}
val errorInfo = exception.errorInfo
assertNotNull(errorInfo)
@@ -98,7 +102,7 @@ class DefaultLiveMapTest {
)
// RTLM15b - Should skip operation when serial is not newer
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was not updated (operation was skipped)
assertEquals("serial2", liveMap.siteTimeserials["site1"])
@@ -128,9 +132,143 @@ class DefaultLiveMapTest {
)
// RTLM15c - Should update site serial when operation is valid
- liveMap.applyObject(message)
+ liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
// Verify that the site serial was updated
assertEquals("serial2", liveMap.siteTimeserials["site1"])
}
+
+ @Test
+ fun `(RTLM15c LOCAL) applyObject with LOCAL source updates data but does NOT update siteTimeserials`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+ assertTrue(liveMap.siteTimeserials.isEmpty(), "siteTimeserials should start empty")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = io.ably.lib.objects.ObjectData(value = io.ably.lib.objects.ObjectValue.String("value1")))
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLM15c - LOCAL source: data IS updated (entry set), siteTimeserials is NOT updated
+ val result = liveMap.applyObject(message, ObjectsOperationSource.LOCAL)
+
+ assertTrue(result, "applyObject should return true for successful MAP_SET")
+ assertEquals("value1", liveMap.data["key1"]?.data?.value?.value, "map entry should be updated for LOCAL source")
+ assertFalse(liveMap.siteTimeserials.containsKey("site1"),
+ "siteTimeserials should NOT be updated for LOCAL source")
+ }
+
+ @Test
+ fun `(RTLM15b return) applyObject returns false when incoming serial is not newer than existing`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+ liveMap.siteTimeserials["site1"] = "serial5" // Newer than incoming "serial1"
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = io.ably.lib.objects.ObjectData(value = io.ably.lib.objects.ObjectValue.String("value1")))
+ ),
+ serial = "serial1", // Older than "serial5"
+ siteCode = "site1"
+ )
+
+ // RTLM15b - Should return false when canApplyOperation fails
+ val result = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertFalse(result, "applyObject should return false when serial is not newer")
+ assertEquals("serial5", liveMap.siteTimeserials["site1"], "siteTimeserials should not change")
+ }
+
+ @Test
+ fun `(RTLM15e return) applyObject returns false when object is tombstoned`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+ liveMap.tombstone(null) // Tombstone the object
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = io.ably.lib.objects.ObjectData(value = io.ably.lib.objects.ObjectValue.String("value1")))
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLM15e - Should return false when object is tombstoned
+ val result = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertFalse(result, "applyObject should return false when object is tombstoned")
+ }
+
+ @Test
+ fun `(RTLM15d2b) applyObject returns true for successful MAP_SET`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = io.ably.lib.objects.ObjectData(value = io.ably.lib.objects.ObjectValue.String("value1")))
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLM15d2b - Should return true for successful MAP_SET
+ val result = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for successful MAP_SET")
+ assertEquals("value1", liveMap.data["key1"]?.data?.value?.value)
+ }
+
+ @Test
+ fun `(RTLM15d3b) applyObject returns true for successful MAP_REMOVE`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.MapRemove,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1")
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLM15d3b - Should return true for successful MAP_REMOVE
+ val result = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for successful MAP_REMOVE")
+ }
+
+ @Test
+ fun `(RTLM15d5b) applyObject returns true for OBJECT_DELETE (tombstone)`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps("map:testMap@1")
+
+ val message = ObjectMessage(
+ id = "testId",
+ operation = ObjectOperation(
+ action = ObjectOperationAction.ObjectDelete,
+ objectId = "map:testMap@1",
+ ),
+ serial = "serial1",
+ siteCode = "site1"
+ )
+
+ // RTLM15d5b - Should return true for OBJECT_DELETE (tombstone applied)
+ val result = liveMap.applyObject(message, ObjectsOperationSource.CHANNEL)
+
+ assertTrue(result, "applyObject should return true for OBJECT_DELETE")
+ assertTrue(liveMap.isTombstoned, "object should be tombstoned")
+ }
}
diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
index 8f5e37bbd..a1da570fe 100644
--- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
+++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt
@@ -459,7 +459,71 @@ class LiveMapManagerTest {
}
@Test
- fun `(RTLM15, RTLM15d4) LiveMapManager should throw error for unsupported action`() {
+ fun `(RTLM15d1b) LiveMapManager applyOperation returns true for MAP_CREATE`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps()
+ val liveMapManager = liveMap.LiveMapManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.MapCreate,
+ objectId = "map:testMap@1",
+ map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
+ )
+
+ // RTLM15d1b - Should return true for successful MAP_CREATE
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertTrue(result, "applyOperation should return true for MAP_CREATE")
+ }
+
+ @Test
+ fun `(RTLM15d2b) LiveMapManager applyOperation returns true for MAP_SET`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps()
+ val liveMapManager = liveMap.LiveMapManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.MapSet,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1", data = ObjectData(value = ObjectValue.String("value1")))
+ )
+
+ // RTLM15d2b - Should return true for successful MAP_SET
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertTrue(result, "applyOperation should return true for MAP_SET")
+ }
+
+ @Test
+ fun `(RTLM15d3b) LiveMapManager applyOperation returns true for MAP_REMOVE`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps()
+ val liveMapManager = liveMap.LiveMapManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.MapRemove,
+ objectId = "map:testMap@1",
+ mapOp = ObjectsMapOp(key = "key1")
+ )
+
+ // RTLM15d3b - Should return true for successful MAP_REMOVE
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertTrue(result, "applyOperation should return true for MAP_REMOVE")
+ }
+
+ @Test
+ fun `(RTLM15d5b) LiveMapManager applyOperation returns true for OBJECT_DELETE`() {
+ val liveMap = getDefaultLiveMapWithMockedDeps()
+ val liveMapManager = liveMap.LiveMapManager
+
+ val operation = ObjectOperation(
+ action = ObjectOperationAction.ObjectDelete,
+ objectId = "map:testMap@1",
+ )
+
+ // RTLM15d5b - Should return true for OBJECT_DELETE (tombstone)
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertTrue(result, "applyOperation should return true for OBJECT_DELETE")
+ assertTrue(liveMap.isTombstoned, "map should be tombstoned after ObjectDelete")
+ }
+
+ @Test
+ fun `(RTLM15, RTLM15d4) LiveMapManager should return false for unsupported action`() {
val liveMap = getDefaultLiveMapWithMockedDeps()
val liveMapManager = liveMap.LiveMapManager
@@ -469,15 +533,9 @@ class LiveMapManagerTest {
counter = ObjectsCounter(count = 20.0)
)
- // RTLM15d4 - Should throw error for unsupported action
- val exception = assertFailsWith {
- liveMapManager.applyOperation(operation, "serial1", null)
- }
-
- val errorInfo = exception.errorInfo
- assertNotNull(errorInfo, "Error info should not be null")
- assertEquals(92000, errorInfo?.code) // InvalidObject error code
- assertEquals(500, errorInfo?.statusCode) // InternalServerError status code
+ // RTLM15d4 - Should return false for unsupported action (no longer throws)
+ val result = liveMapManager.applyOperation(operation, "serial1", null)
+ assertFalse(result, "Should return false for unsupported action")
}
@Test