diff --git a/common/src/main/java/com/ably/tracking/common/Ably.kt b/common/src/main/java/com/ably/tracking/common/Ably.kt index c9947bc5b..6976473e0 100644 --- a/common/src/main/java/com/ably/tracking/common/Ably.kt +++ b/common/src/main/java/com/ably/tracking/common/Ably.kt @@ -80,12 +80,26 @@ interface Ably { /** * A suspending version of [subscribeForPresenceMessages] + * + * @param emitCurrentMessages If set to true it emits messages for each client that's currently in the presence. * */ suspend fun subscribeForPresenceMessages( trackableId: String, - listener: (PresenceMessage) -> Unit + listener: (PresenceMessage) -> Unit, + emitCurrentMessages: Boolean = true, ): Result + /** + * Returns current messages from the trackable channel's presence. + * Should be called only when there's an existing channel for the [trackableId]. + * If a channel for the [trackableId] doesn't exist then this method returns a Result with an empty list. + * + * @param trackableId The ID of the trackable channel. + * + * @return Result containing the current presence messages from the channel's presence. If something goes wrong it will contain a [ConnectionException]. + * */ + suspend fun getCurrentPresence(trackableId: String): Result> + /** * Sends an enhanced location update to the channel. * Should be called only when there's an existing channel for the [trackableId]. @@ -632,11 +646,14 @@ constructor( override suspend fun subscribeForPresenceMessages( trackableId: String, - listener: (PresenceMessage) -> Unit + listener: (PresenceMessage) -> Unit, + emitCurrentMessages: Boolean, ): Result { val channel = channels[trackableId] ?: return Result.success(Unit) return try { - emitAllCurrentMessagesFromPresence(channel, listener) + if (emitCurrentMessages) { + emitAllCurrentMessagesFromPresence(channel, listener) + } channel.presence.subscribe { val parsedMessage = it.toTracking(gson) if (parsedMessage != null) { @@ -652,24 +669,36 @@ constructor( } } + private fun emitAllCurrentMessagesFromPresence(channel: Channel, listener: (PresenceMessage) -> Unit) { + getAllCurrentMessagesFromPresence(channel).forEach { presenceMessage -> + // Each message is launched in a fire-and-forget manner to not block this method on the listener() call + scope.launch { listener(presenceMessage) } + } + } + + override suspend fun getCurrentPresence(trackableId: String): Result> { + val channel = channels[trackableId] ?: return Result.success(emptyList()) + return suspendCancellableCoroutine { continuation -> + try { + val currentPresenceMessages = getAllCurrentMessagesFromPresence(channel) + continuation.resume(Result.success(currentPresenceMessages)) + } catch (ablyException: AblyException) { + continuation.resume(Result.failure(ablyException.errorInfo.toTrackingException())) + } + } + } + /** * Warning: This method might block the current thread due to the presence.get(true) call. */ - private fun emitAllCurrentMessagesFromPresence(channel: Channel, listener: (PresenceMessage) -> Unit) { - channel.presence.get(true).let { messages -> - messages.forEach { presenceMessage -> - // Each message is launched in a fire-and-forget manner to not block this method on the listener() call - scope.launch { - val parsedMessage = presenceMessage.toTracking(gson) - if (parsedMessage != null) { - listener(parsedMessage) - } else { - logHandler?.w("Presence message in unexpected format: $presenceMessage") - } + private fun getAllCurrentMessagesFromPresence(channel: Channel): List = + channel.presence.get(true).mapNotNull { presenceMessage -> + presenceMessage.toTracking(gson).also { + if (it == null) { + logHandler?.w("Presence message in unexpected format: $presenceMessage") } } } - } override fun updatePresenceData(trackableId: String, presenceData: PresenceData, callback: (Result) -> Unit) { scope.launch { diff --git a/integration-testing-app/src/androidTest/java/com/ably/tracking/tests/PublisherAndSubscriberTests.kt b/integration-testing-app/src/androidTest/java/com/ably/tracking/tests/PublisherAndSubscriberTests.kt index eb2bc60eb..ebe7dcf30 100644 --- a/integration-testing-app/src/androidTest/java/com/ably/tracking/tests/PublisherAndSubscriberTests.kt +++ b/integration-testing-app/src/androidTest/java/com/ably/tracking/tests/PublisherAndSubscriberTests.kt @@ -4,6 +4,7 @@ import androidx.test.ext.junit.runners.AndroidJUnit4 import androidx.test.platform.app.InstrumentationRegistry import com.ably.tracking.LocationUpdate import com.ably.tracking.TrackableState +import com.ably.tracking.annotations.Experimental import com.ably.tracking.publisher.Trackable import com.ably.tracking.subscriber.Subscriber import com.ably.tracking.test.android.common.BooleanExpectation @@ -239,4 +240,51 @@ class PublisherAndSubscriberTests { // then subscriberFailedExpectation.assertFulfilled() } + + @OptIn(Experimental::class) + @Test + fun shouldNotEmitPublisherPresenceFalseIfPublisherIsPresentFromTheStart() { + // given + val subscriberEmittedPublisherPresentExpectation = UnitExpectation("subscriber emitted publisher present") + val context = InstrumentationRegistry.getInstrumentation().targetContext + val trackableId = UUID.randomUUID().toString() + val scope = CoroutineScope(Dispatchers.Default) + val publisherPresentValues = mutableListOf() + + // when + // create publisher and start tracking + val publisher = createAndStartPublisher(context, sendResolution = true) + runBlocking { + publisher.track(Trackable(trackableId)) + } + + // create subscriber and listen for publisher presence + var subscriber: Subscriber + runBlocking { + subscriber = createAndStartSubscriber(trackableId) + } + subscriber.publisherPresence + .onEach { isPublisherPresent -> + publisherPresentValues.add(isPublisherPresent) + if (isPublisherPresent) { + subscriberEmittedPublisherPresentExpectation.fulfill() + } + } + .launchIn(scope) + + // await for publisher present event + subscriberEmittedPublisherPresentExpectation.await() + + // cleanup + runBlocking { + coroutineScope { + launch { publisher.stop() } + launch { subscriber.stop() } + } + } + + // then + subscriberEmittedPublisherPresentExpectation.assertFulfilled() + Assert.assertTrue("first publisherPresence value should be true", publisherPresentValues.first()) + } } diff --git a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/RetrySubscribeToPresenceWorkerTest.kt b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/RetrySubscribeToPresenceWorkerTest.kt index 3afd1a84a..650bf31fd 100644 --- a/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/RetrySubscribeToPresenceWorkerTest.kt +++ b/publishing-sdk/src/test/java/com/ably/tracking/publisher/workerqueue/workers/RetrySubscribeToPresenceWorkerTest.kt @@ -62,7 +62,7 @@ internal class RetrySubscribeToPresenceWorkerTest { // then verify(exactly = 0) { - ably.subscribeForPresenceMessages(trackable.id, any(), any()) + ably.subscribeForPresenceMessages(trackable.id, any(), any<(Result) -> Unit>()) } } @@ -91,7 +91,7 @@ internal class RetrySubscribeToPresenceWorkerTest { // then verify(exactly = 0) { - ably.subscribeForPresenceMessages(trackable.id, any(), any()) + ably.subscribeForPresenceMessages(trackable.id, any(), any<(Result) -> Unit>()) } } } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt new file mode 100644 index 000000000..d6e4bb65a --- /dev/null +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/PresenceMessageProcessor.kt @@ -0,0 +1,32 @@ +package com.ably.tracking.subscriber + +import com.ably.tracking.common.ClientTypes +import com.ably.tracking.common.PresenceAction +import com.ably.tracking.common.PresenceMessage + +internal fun processPresenceMessage( + presenceMessage: PresenceMessage, + properties: Properties, + subscriberInteractor: SubscriberInteractor, +) { + when (presenceMessage.action) { + PresenceAction.PRESENT_OR_ENTER -> { + if (presenceMessage.data.type == ClientTypes.PUBLISHER) { + subscriberInteractor.updatePublisherPresence(properties, true) + subscriberInteractor.updateTrackableState(properties) + subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) + } + } + PresenceAction.LEAVE_OR_ABSENT -> { + if (presenceMessage.data.type == ClientTypes.PUBLISHER) { + subscriberInteractor.updatePublisherPresence(properties, false) + subscriberInteractor.updateTrackableState(properties) + } + } + PresenceAction.UPDATE -> { + if (presenceMessage.data.type == ClientTypes.PUBLISHER) { + subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) + } + } + } +} diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt index 2b2d3efb8..0066e3eaa 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/WorkerFactory.kt @@ -8,6 +8,7 @@ import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.workerqueue.workers.ChangeResolutionWorker import com.ably.tracking.subscriber.workerqueue.workers.DisconnectWorker +import com.ably.tracking.subscriber.workerqueue.workers.ProcessInitialPresenceMessagesWorker import com.ably.tracking.subscriber.workerqueue.workers.StartConnectionWorker import com.ably.tracking.subscriber.workerqueue.workers.StopConnectionWorker import com.ably.tracking.subscriber.workerqueue.workers.SubscribeForPresenceMessagesWorker @@ -71,6 +72,11 @@ internal class WorkerFactory( subscriberInteractor, params.callbackFunction ) + is WorkerSpecification.ProcessInitialPresenceMessages -> ProcessInitialPresenceMessagesWorker( + params.presenceMessages, + subscriberInteractor, + params.callbackFunction, + ) } } @@ -112,4 +118,9 @@ internal sealed class WorkerSpecification { data class StopConnection( val callbackFunction: ResultCallbackFunction ) : WorkerSpecification() + + data class ProcessInitialPresenceMessages( + val presenceMessages: List, + val callbackFunction: ResultCallbackFunction, + ) : WorkerSpecification() } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt new file mode 100644 index 000000000..e75e24047 --- /dev/null +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorker.kt @@ -0,0 +1,27 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.common.PresenceMessage +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.Properties +import com.ably.tracking.subscriber.SubscriberInteractor +import com.ably.tracking.subscriber.processPresenceMessage +import com.ably.tracking.subscriber.workerqueue.CallbackWorker +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification + +internal class ProcessInitialPresenceMessagesWorker( + private val presenceMessages: List, + private val subscriberInteractor: SubscriberInteractor, + callbackFunction: ResultCallbackFunction, +) : CallbackWorker(callbackFunction) { + override fun doWork( + properties: Properties, + doAsyncWork: (suspend () -> Unit) -> Unit, + postWork: (WorkerSpecification) -> Unit + ): Properties { + presenceMessages.forEach { presenceMessage -> + processPresenceMessage(presenceMessage, properties, subscriberInteractor) + } + postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) + return properties + } +} diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorker.kt index eee9a2dcd..e09d844f8 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorker.kt @@ -1,6 +1,7 @@ package com.ably.tracking.subscriber.workerqueue.workers import com.ably.tracking.common.Ably +import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.Properties import com.ably.tracking.subscriber.workerqueue.CallbackWorker @@ -17,21 +18,35 @@ internal class SubscribeForPresenceMessagesWorker( postWork: (WorkerSpecification) -> Unit ): Properties { doAsyncWork { - val result = ably.subscribeForPresenceMessages( + val currentPresenceMessagesResult = ably.getCurrentPresence(trackableId) + val initialPresenceMessages: List = + try { + currentPresenceMessagesResult.getOrThrow() + } catch (error: Throwable) { + postDisconnectWork(postWork, Result.failure(error)) + return@doAsyncWork + } + + val subscribeForPresenceMessagesResult = ably.subscribeForPresenceMessages( trackableId = trackableId, + emitCurrentMessages = false, listener = { postWork(WorkerSpecification.UpdatePublisherPresence(it)) } ) - - if (result.isSuccess) { - postWork(WorkerSpecification.SubscribeToChannel(callbackFunction)) - } else { - postWork( - WorkerSpecification.Disconnect(trackableId) { - callbackFunction(result) - } - ) + if (subscribeForPresenceMessagesResult.isFailure) { + postDisconnectWork(postWork, subscribeForPresenceMessagesResult) + return@doAsyncWork } + + postWork(WorkerSpecification.ProcessInitialPresenceMessages(initialPresenceMessages, callbackFunction)) } return properties } + + private fun postDisconnectWork(postWork: (WorkerSpecification) -> Unit, result: Result) { + postWork( + WorkerSpecification.Disconnect(trackableId) { + callbackFunction(result) + } + ) + } } diff --git a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt index 5c2150a09..b2640fb7e 100644 --- a/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt +++ b/subscribing-sdk/src/main/java/com/ably/tracking/subscriber/workerqueue/workers/UpdatePublisherPresenceWorker.kt @@ -1,10 +1,9 @@ package com.ably.tracking.subscriber.workerqueue.workers -import com.ably.tracking.common.ClientTypes -import com.ably.tracking.common.PresenceAction import com.ably.tracking.common.PresenceMessage import com.ably.tracking.subscriber.SubscriberInteractor import com.ably.tracking.subscriber.Properties +import com.ably.tracking.subscriber.processPresenceMessage import com.ably.tracking.subscriber.workerqueue.Worker import com.ably.tracking.subscriber.workerqueue.WorkerSpecification @@ -17,26 +16,7 @@ internal class UpdatePublisherPresenceWorker( doAsyncWork: (suspend () -> Unit) -> Unit, postWork: (WorkerSpecification) -> Unit ): Properties { - when (presenceMessage.action) { - PresenceAction.PRESENT_OR_ENTER -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, true) - subscriberInteractor.updateTrackableState(properties) - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - PresenceAction.LEAVE_OR_ABSENT -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherPresence(properties, false) - subscriberInteractor.updateTrackableState(properties) - } - } - PresenceAction.UPDATE -> { - if (presenceMessage.data.type == ClientTypes.PUBLISHER) { - subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data) - } - } - } + processPresenceMessage(presenceMessage, properties, subscriberInteractor) return properties } diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/DefaultSubscriberTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/DefaultSubscriberTest.kt index 76b84db7e..e1d9d2f66 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/DefaultSubscriberTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/DefaultSubscriberTest.kt @@ -4,6 +4,7 @@ import com.ably.tracking.ConnectionException import com.ably.tracking.common.Ably import com.ably.tracking.test.common.mockCreateConnectionSuccess import com.ably.tracking.test.common.mockDisconnectSuccess +import com.ably.tracking.test.common.mockGetCurrentPresenceSuccess import com.ably.tracking.test.common.mockSubscribeToPresenceError import io.mockk.coVerify import io.mockk.mockk @@ -21,6 +22,7 @@ class DefaultSubscriberTest { // given ably.mockCreateConnectionSuccess(trackableId) ably.mockDisconnectSuccess(trackableId) + ably.mockGetCurrentPresenceSuccess(trackableId) ably.mockSubscribeToPresenceError(trackableId) // when @@ -36,6 +38,7 @@ class DefaultSubscriberTest { // given ably.mockCreateConnectionSuccess(trackableId) ably.mockDisconnectSuccess(trackableId) + ably.mockGetCurrentPresenceSuccess(trackableId) ably.mockSubscribeToPresenceError(trackableId) // when diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt new file mode 100644 index 000000000..3eb45a1cd --- /dev/null +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/ProcessInitialPresenceMessagesWorkerTest.kt @@ -0,0 +1,90 @@ +package com.ably.tracking.subscriber.workerqueue.workers + +import com.ably.tracking.Accuracy +import com.ably.tracking.Resolution +import com.ably.tracking.common.ClientTypes +import com.ably.tracking.common.PresenceAction +import com.ably.tracking.common.PresenceData +import com.ably.tracking.common.PresenceMessage +import com.ably.tracking.common.ResultCallbackFunction +import com.ably.tracking.subscriber.Properties +import com.ably.tracking.subscriber.SubscriberInteractor +import com.ably.tracking.subscriber.processPresenceMessage +import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.mockkStatic +import io.mockk.runs +import io.mockk.unmockkStatic +import io.mockk.verify +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runBlockingTest +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Test + +@ExperimentalCoroutinesApi +internal class ProcessInitialPresenceMessagesWorkerTest { + + private val subscriberInteractor: SubscriberInteractor = mockk() + private val asyncWorks = mutableListOf Unit>() + private val postedWorks = mutableListOf() + + @Before + fun setup() { + mockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") + every { processPresenceMessage(any(), any(), any()) } just runs + } + + @After + fun cleanup() { + unmockkStatic("com.ably.tracking.subscriber.PresenceMessageProcessorKt") + } + + @Test + fun `should process all presence messages`() = runBlockingTest { + // given + val initialProperties = anyProperties() + val presenceMessages = listOf(anyPresenceMessage(), anyPresenceMessage(), anyPresenceMessage()) + val worker = ProcessInitialPresenceMessagesWorker(presenceMessages, subscriberInteractor) {} + + // when + worker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + verify(exactly = presenceMessages.size) { + processPresenceMessage(any(), initialProperties, subscriberInteractor) + } + } + + @Test + fun `should post subscribe to channel work after processing presence messages`() = runBlockingTest { + // given + val callbackFunction: ResultCallbackFunction = {} + val worker = ProcessInitialPresenceMessagesWorker(emptyList(), subscriberInteractor, callbackFunction) + + // when + worker.doWork( + anyProperties(), + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + + // then + Assert.assertEquals(WorkerSpecification.SubscribeToChannel(callbackFunction), postedWorks[0]) + } + + private fun anyPresenceMessage() = PresenceMessage( + PresenceAction.PRESENT_OR_ENTER, + PresenceData(ClientTypes.PUBLISHER, null, null), + clientId = "" + ) + + private fun anyProperties() = Properties(Resolution(Accuracy.BALANCED, 100, 100.0)) +} diff --git a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt index 39262ae5b..b160410fd 100644 --- a/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt +++ b/subscribing-sdk/src/test/java/com/ably/tracking/subscriber/workerqueue/workers/SubscribeForPresenceMessagesWorkerTest.kt @@ -10,6 +10,8 @@ import com.ably.tracking.common.PresenceMessage import com.ably.tracking.common.ResultCallbackFunction import com.ably.tracking.subscriber.Properties import com.ably.tracking.subscriber.workerqueue.WorkerSpecification +import com.ably.tracking.test.common.mockGetCurrentPresenceError +import com.ably.tracking.test.common.mockGetCurrentPresenceSuccess import com.ably.tracking.test.common.mockSubscribeToPresenceError import com.ably.tracking.test.common.mockSubscribeToPresenceSuccess import io.mockk.CapturingSlot @@ -38,6 +40,7 @@ internal class SubscribeForPresenceMessagesWorkerTest { val initialProperties = Properties(Resolution(Accuracy.BALANCED, 100, 100.0)) val presenceListenerSlot: CapturingSlot<(PresenceMessage) -> Unit> = slot() val presenceMessage = createPresenceMessage() + ably.mockGetCurrentPresenceSuccess(trackableId) ably.mockSubscribeToPresenceSuccess(trackableId, presenceListenerSlot) // when @@ -60,9 +63,11 @@ internal class SubscribeForPresenceMessagesWorkerTest { ) @Test - fun `should post subscribe to channel work when subscribe to presence returns success`() = runBlockingTest { + fun `should post process initial presence messages work when both get current presence and subscribe to presence return success`() = runBlockingTest { // given val initialProperties = Properties(Resolution(Accuracy.BALANCED, 100, 100.0)) + val initialPresenceMessages = listOf(anyPresenceMessage()) + ably.mockGetCurrentPresenceSuccess(trackableId, initialPresenceMessages) ably.mockSubscribeToPresenceSuccess(trackableId) // when @@ -74,13 +79,14 @@ internal class SubscribeForPresenceMessagesWorkerTest { asyncWorks.executeAll() // then - Assert.assertEquals(WorkerSpecification.SubscribeToChannel(callbackFunction), postedWorks[0]) + Assert.assertEquals(WorkerSpecification.ProcessInitialPresenceMessages(initialPresenceMessages, callbackFunction), postedWorks[0]) } @Test fun `should post disconnect work when subscribe to presence returns failure`() = runBlockingTest { // given val initialProperties = Properties(Resolution(Accuracy.BALANCED, 100, 100.0)) + ably.mockGetCurrentPresenceSuccess(trackableId) ably.mockSubscribeToPresenceError(trackableId) // when @@ -94,4 +100,26 @@ internal class SubscribeForPresenceMessagesWorkerTest { // then Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) } + + @Test + fun `should post disconnect work when get current presence returns failure`() = runBlockingTest { + // given + val initialProperties = Properties(Resolution(Accuracy.BALANCED, 100, 100.0)) + ably.mockGetCurrentPresenceError(trackableId) + ably.mockSubscribeToPresenceSuccess(trackableId) + + // when + subscribeForPresenceMessagesWorker.doWork( + initialProperties, + asyncWorks.appendWork(), + postedWorks.appendSpecification() + ) + asyncWorks.executeAll() + + // then + Assert.assertTrue(postedWorks[0] is WorkerSpecification.Disconnect) + } + + private fun anyPresenceMessage() = + PresenceMessage(PresenceAction.PRESENT_OR_ENTER, PresenceData(ClientTypes.PUBLISHER), "any-client-id") } diff --git a/test-common/src/main/java/com/ably/tracking/test/common/AblyTestExtensions.kt b/test-common/src/main/java/com/ably/tracking/test/common/AblyTestExtensions.kt index 64f9828e7..6d99eacfa 100644 --- a/test-common/src/main/java/com/ably/tracking/test/common/AblyTestExtensions.kt +++ b/test-common/src/main/java/com/ably/tracking/test/common/AblyTestExtensions.kt @@ -72,7 +72,7 @@ fun Ably.mockSubscribeToPresenceSuccess( } answers { callbackSlot.captured(Result.success(Unit)) } - coEvery { subscribeForPresenceMessages(trackableId, capture(listenerSlot)) } returns Result.success(Unit) + coEvery { subscribeForPresenceMessages(trackableId, capture(listenerSlot), any()) } returns Result.success(Unit) } fun Ably.mockSubscribeToPresenceError(trackableId: String) { @@ -82,7 +82,18 @@ fun Ably.mockSubscribeToPresenceError(trackableId: String) { } answers { callbackSlot.captured(Result.failure(anyConnectionException())) } - coEvery { subscribeForPresenceMessages(trackableId, any()) } returns Result.failure(anyConnectionException()) + coEvery { subscribeForPresenceMessages(trackableId, any(), any()) } returns Result.failure(anyConnectionException()) +} + +fun Ably.mockGetCurrentPresenceSuccess( + trackableId: String, + currentPresenceMessage: List = emptyList(), +) { + coEvery { getCurrentPresence(trackableId) } returns Result.success(currentPresenceMessage) +} + +fun Ably.mockGetCurrentPresenceError(trackableId: String) { + coEvery { getCurrentPresence(trackableId) } returns Result.failure(anyConnectionException()) } fun Ably.mockDisconnect(trackableId: String, result: Result) {