Skip to content

Commit

Permalink
Merge pull request #794 from ably/feature/wait-for-presence-sync
Browse files Browse the repository at this point in the history
Process initial presence messages before returning a Subscriber instance
  • Loading branch information
QuintinWillison authored Nov 7, 2022
2 parents 8016b0f + 764c24e commit e98854b
Show file tree
Hide file tree
Showing 12 changed files with 327 additions and 53 deletions.
59 changes: 44 additions & 15 deletions common/src/main/java/com/ably/tracking/common/Ably.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,26 @@ interface Ably {

/**
* A suspending version of [subscribeForPresenceMessages]
*
* @param emitCurrentMessages If set to true it emits messages for each client that's currently in the presence.
* */
suspend fun subscribeForPresenceMessages(
trackableId: String,
listener: (PresenceMessage) -> Unit
listener: (PresenceMessage) -> Unit,
emitCurrentMessages: Boolean = true,
): Result<Unit>

/**
* Returns current messages from the trackable channel's presence.
* Should be called only when there's an existing channel for the [trackableId].
* If a channel for the [trackableId] doesn't exist then this method returns a Result with an empty list.
*
* @param trackableId The ID of the trackable channel.
*
* @return Result containing the current presence messages from the channel's presence. If something goes wrong it will contain a [ConnectionException].
* */
suspend fun getCurrentPresence(trackableId: String): Result<List<PresenceMessage>>

/**
* Sends an enhanced location update to the channel.
* Should be called only when there's an existing channel for the [trackableId].
Expand Down Expand Up @@ -632,11 +646,14 @@ constructor(

override suspend fun subscribeForPresenceMessages(
trackableId: String,
listener: (PresenceMessage) -> Unit
listener: (PresenceMessage) -> Unit,
emitCurrentMessages: Boolean,
): Result<Unit> {
val channel = channels[trackableId] ?: return Result.success(Unit)
return try {
emitAllCurrentMessagesFromPresence(channel, listener)
if (emitCurrentMessages) {
emitAllCurrentMessagesFromPresence(channel, listener)
}
channel.presence.subscribe {
val parsedMessage = it.toTracking(gson)
if (parsedMessage != null) {
Expand All @@ -652,24 +669,36 @@ constructor(
}
}

private fun emitAllCurrentMessagesFromPresence(channel: Channel, listener: (PresenceMessage) -> Unit) {
getAllCurrentMessagesFromPresence(channel).forEach { presenceMessage ->
// Each message is launched in a fire-and-forget manner to not block this method on the listener() call
scope.launch { listener(presenceMessage) }
}
}

override suspend fun getCurrentPresence(trackableId: String): Result<List<PresenceMessage>> {
val channel = channels[trackableId] ?: return Result.success(emptyList())
return suspendCancellableCoroutine { continuation ->
try {
val currentPresenceMessages = getAllCurrentMessagesFromPresence(channel)
continuation.resume(Result.success(currentPresenceMessages))
} catch (ablyException: AblyException) {
continuation.resume(Result.failure(ablyException.errorInfo.toTrackingException()))
}
}
}

/**
* Warning: This method might block the current thread due to the presence.get(true) call.
*/
private fun emitAllCurrentMessagesFromPresence(channel: Channel, listener: (PresenceMessage) -> Unit) {
channel.presence.get(true).let { messages ->
messages.forEach { presenceMessage ->
// Each message is launched in a fire-and-forget manner to not block this method on the listener() call
scope.launch {
val parsedMessage = presenceMessage.toTracking(gson)
if (parsedMessage != null) {
listener(parsedMessage)
} else {
logHandler?.w("Presence message in unexpected format: $presenceMessage")
}
private fun getAllCurrentMessagesFromPresence(channel: Channel): List<PresenceMessage> =
channel.presence.get(true).mapNotNull { presenceMessage ->
presenceMessage.toTracking(gson).also {
if (it == null) {
logHandler?.w("Presence message in unexpected format: $presenceMessage")
}
}
}
}

override fun updatePresenceData(trackableId: String, presenceData: PresenceData, callback: (Result<Unit>) -> Unit) {
scope.launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.platform.app.InstrumentationRegistry
import com.ably.tracking.LocationUpdate
import com.ably.tracking.TrackableState
import com.ably.tracking.annotations.Experimental
import com.ably.tracking.publisher.Trackable
import com.ably.tracking.subscriber.Subscriber
import com.ably.tracking.test.android.common.BooleanExpectation
Expand Down Expand Up @@ -239,4 +240,51 @@ class PublisherAndSubscriberTests {
// then
subscriberFailedExpectation.assertFulfilled()
}

@OptIn(Experimental::class)
@Test
fun shouldNotEmitPublisherPresenceFalseIfPublisherIsPresentFromTheStart() {
// given
val subscriberEmittedPublisherPresentExpectation = UnitExpectation("subscriber emitted publisher present")
val context = InstrumentationRegistry.getInstrumentation().targetContext
val trackableId = UUID.randomUUID().toString()
val scope = CoroutineScope(Dispatchers.Default)
val publisherPresentValues = mutableListOf<Boolean>()

// when
// create publisher and start tracking
val publisher = createAndStartPublisher(context, sendResolution = true)
runBlocking {
publisher.track(Trackable(trackableId))
}

// create subscriber and listen for publisher presence
var subscriber: Subscriber
runBlocking {
subscriber = createAndStartSubscriber(trackableId)
}
subscriber.publisherPresence
.onEach { isPublisherPresent ->
publisherPresentValues.add(isPublisherPresent)
if (isPublisherPresent) {
subscriberEmittedPublisherPresentExpectation.fulfill()
}
}
.launchIn(scope)

// await for publisher present event
subscriberEmittedPublisherPresentExpectation.await()

// cleanup
runBlocking {
coroutineScope {
launch { publisher.stop() }
launch { subscriber.stop() }
}
}

// then
subscriberEmittedPublisherPresentExpectation.assertFulfilled()
Assert.assertTrue("first publisherPresence value should be true", publisherPresentValues.first())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ internal class RetrySubscribeToPresenceWorkerTest {

// then
verify(exactly = 0) {
ably.subscribeForPresenceMessages(trackable.id, any(), any())
ably.subscribeForPresenceMessages(trackable.id, any(), any<(Result<Unit>) -> Unit>())
}
}

Expand Down Expand Up @@ -91,7 +91,7 @@ internal class RetrySubscribeToPresenceWorkerTest {

// then
verify(exactly = 0) {
ably.subscribeForPresenceMessages(trackable.id, any(), any())
ably.subscribeForPresenceMessages(trackable.id, any(), any<(Result<Unit>) -> Unit>())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package com.ably.tracking.subscriber

import com.ably.tracking.common.ClientTypes
import com.ably.tracking.common.PresenceAction
import com.ably.tracking.common.PresenceMessage

internal fun processPresenceMessage(
presenceMessage: PresenceMessage,
properties: Properties,
subscriberInteractor: SubscriberInteractor,
) {
when (presenceMessage.action) {
PresenceAction.PRESENT_OR_ENTER -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherPresence(properties, true)
subscriberInteractor.updateTrackableState(properties)
subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data)
}
}
PresenceAction.LEAVE_OR_ABSENT -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherPresence(properties, false)
subscriberInteractor.updateTrackableState(properties)
}
}
PresenceAction.UPDATE -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.ably.tracking.common.ResultCallbackFunction
import com.ably.tracking.subscriber.SubscriberInteractor
import com.ably.tracking.subscriber.workerqueue.workers.ChangeResolutionWorker
import com.ably.tracking.subscriber.workerqueue.workers.DisconnectWorker
import com.ably.tracking.subscriber.workerqueue.workers.ProcessInitialPresenceMessagesWorker
import com.ably.tracking.subscriber.workerqueue.workers.StartConnectionWorker
import com.ably.tracking.subscriber.workerqueue.workers.StopConnectionWorker
import com.ably.tracking.subscriber.workerqueue.workers.SubscribeForPresenceMessagesWorker
Expand Down Expand Up @@ -71,6 +72,11 @@ internal class WorkerFactory(
subscriberInteractor,
params.callbackFunction
)
is WorkerSpecification.ProcessInitialPresenceMessages -> ProcessInitialPresenceMessagesWorker(
params.presenceMessages,
subscriberInteractor,
params.callbackFunction,
)
}
}

Expand Down Expand Up @@ -112,4 +118,9 @@ internal sealed class WorkerSpecification {
data class StopConnection(
val callbackFunction: ResultCallbackFunction<Unit>
) : WorkerSpecification()

data class ProcessInitialPresenceMessages(
val presenceMessages: List<PresenceMessage>,
val callbackFunction: ResultCallbackFunction<Unit>,
) : WorkerSpecification()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ably.tracking.subscriber.workerqueue.workers

import com.ably.tracking.common.PresenceMessage
import com.ably.tracking.common.ResultCallbackFunction
import com.ably.tracking.subscriber.Properties
import com.ably.tracking.subscriber.SubscriberInteractor
import com.ably.tracking.subscriber.processPresenceMessage
import com.ably.tracking.subscriber.workerqueue.CallbackWorker
import com.ably.tracking.subscriber.workerqueue.WorkerSpecification

internal class ProcessInitialPresenceMessagesWorker(
private val presenceMessages: List<PresenceMessage>,
private val subscriberInteractor: SubscriberInteractor,
callbackFunction: ResultCallbackFunction<Unit>,
) : CallbackWorker(callbackFunction) {
override fun doWork(
properties: Properties,
doAsyncWork: (suspend () -> Unit) -> Unit,
postWork: (WorkerSpecification) -> Unit
): Properties {
presenceMessages.forEach { presenceMessage ->
processPresenceMessage(presenceMessage, properties, subscriberInteractor)
}
postWork(WorkerSpecification.SubscribeToChannel(callbackFunction))
return properties
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ably.tracking.subscriber.workerqueue.workers

import com.ably.tracking.common.Ably
import com.ably.tracking.common.PresenceMessage
import com.ably.tracking.common.ResultCallbackFunction
import com.ably.tracking.subscriber.Properties
import com.ably.tracking.subscriber.workerqueue.CallbackWorker
Expand All @@ -17,21 +18,35 @@ internal class SubscribeForPresenceMessagesWorker(
postWork: (WorkerSpecification) -> Unit
): Properties {
doAsyncWork {
val result = ably.subscribeForPresenceMessages(
val currentPresenceMessagesResult = ably.getCurrentPresence(trackableId)
val initialPresenceMessages: List<PresenceMessage> =
try {
currentPresenceMessagesResult.getOrThrow()
} catch (error: Throwable) {
postDisconnectWork(postWork, Result.failure(error))
return@doAsyncWork
}

val subscribeForPresenceMessagesResult = ably.subscribeForPresenceMessages(
trackableId = trackableId,
emitCurrentMessages = false,
listener = { postWork(WorkerSpecification.UpdatePublisherPresence(it)) }
)

if (result.isSuccess) {
postWork(WorkerSpecification.SubscribeToChannel(callbackFunction))
} else {
postWork(
WorkerSpecification.Disconnect(trackableId) {
callbackFunction(result)
}
)
if (subscribeForPresenceMessagesResult.isFailure) {
postDisconnectWork(postWork, subscribeForPresenceMessagesResult)
return@doAsyncWork
}

postWork(WorkerSpecification.ProcessInitialPresenceMessages(initialPresenceMessages, callbackFunction))
}
return properties
}

private fun postDisconnectWork(postWork: (WorkerSpecification) -> Unit, result: Result<Unit>) {
postWork(
WorkerSpecification.Disconnect(trackableId) {
callbackFunction(result)
}
)
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.ably.tracking.subscriber.workerqueue.workers

import com.ably.tracking.common.ClientTypes
import com.ably.tracking.common.PresenceAction
import com.ably.tracking.common.PresenceMessage
import com.ably.tracking.subscriber.SubscriberInteractor
import com.ably.tracking.subscriber.Properties
import com.ably.tracking.subscriber.processPresenceMessage
import com.ably.tracking.subscriber.workerqueue.Worker
import com.ably.tracking.subscriber.workerqueue.WorkerSpecification

Expand All @@ -17,26 +16,7 @@ internal class UpdatePublisherPresenceWorker(
doAsyncWork: (suspend () -> Unit) -> Unit,
postWork: (WorkerSpecification) -> Unit
): Properties {
when (presenceMessage.action) {
PresenceAction.PRESENT_OR_ENTER -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherPresence(properties, true)
subscriberInteractor.updateTrackableState(properties)
subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data)
}
}
PresenceAction.LEAVE_OR_ABSENT -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherPresence(properties, false)
subscriberInteractor.updateTrackableState(properties)
}
}
PresenceAction.UPDATE -> {
if (presenceMessage.data.type == ClientTypes.PUBLISHER) {
subscriberInteractor.updatePublisherResolutionInformation(presenceMessage.data)
}
}
}
processPresenceMessage(presenceMessage, properties, subscriberInteractor)
return properties
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.ably.tracking.ConnectionException
import com.ably.tracking.common.Ably
import com.ably.tracking.test.common.mockCreateConnectionSuccess
import com.ably.tracking.test.common.mockDisconnectSuccess
import com.ably.tracking.test.common.mockGetCurrentPresenceSuccess
import com.ably.tracking.test.common.mockSubscribeToPresenceError
import io.mockk.coVerify
import io.mockk.mockk
Expand All @@ -21,6 +22,7 @@ class DefaultSubscriberTest {
// given
ably.mockCreateConnectionSuccess(trackableId)
ably.mockDisconnectSuccess(trackableId)
ably.mockGetCurrentPresenceSuccess(trackableId)
ably.mockSubscribeToPresenceError(trackableId)

// when
Expand All @@ -36,6 +38,7 @@ class DefaultSubscriberTest {
// given
ably.mockCreateConnectionSuccess(trackableId)
ably.mockDisconnectSuccess(trackableId)
ably.mockGetCurrentPresenceSuccess(trackableId)
ably.mockSubscribeToPresenceError(trackableId)

// when
Expand Down
Loading

0 comments on commit e98854b

Please sign in to comment.