Skip to content

Commit

Permalink
Merge pull request #778 from ably/feature/start-and-stop-ably-connection
Browse files Browse the repository at this point in the history
Start and stop the Ably connection manually depending on the number of added trackables
  • Loading branch information
KacperKluka authored Oct 20, 2022
2 parents d6e8396 + 0db89da commit 3b0c239
Show file tree
Hide file tree
Showing 28 changed files with 404 additions and 38 deletions.
86 changes: 80 additions & 6 deletions common/src/main/java/com/ably/tracking/common/Ably.kt
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ interface Ably {
* @throws ConnectionException if something goes wrong.
*/
suspend fun close(presenceData: PresenceData)

/**
* Starts the Ably connection. Must be called before any other method.
*/
suspend fun startConnection(): Result<Unit>

/**
* Stops the Ably connection.
*/
suspend fun stopConnection(): Result<Unit>
}

private const val CHANNEL_NAME_PREFIX = "tracking:"
Expand Down Expand Up @@ -237,6 +247,7 @@ constructor(
this.logLevel = Log.VERBOSE
this.logHandler = Log.LogHandler { severity, tag, msg, tr -> logMessage(severity, tag, msg, tr) }
this.environment = connectionConfiguration.environment
this.autoConnect = false
}
ably = AblyRealtime(clientOptions)
} catch (exception: AblyException) {
Expand Down Expand Up @@ -707,23 +718,30 @@ constructor(
launch { disconnect(trackableId, presenceData) }
}
}
closeConnection()
stopConnection()
}

/**
* Closes [AblyRealtime] and waits until it's either closed or failed.
* If the connection is already closed it returns immediately.
* If the connection is already failed it returns immediately as closing a failed connection should be a no-op
* according to the Ably features spec (https://sdk.ably.com/builds/ably/specification/main/features/#state-conditions-and-operations).
*
* @throws ConnectionException if the [AblyRealtime] state changes to [ConnectionState.failed].
*/
private suspend fun closeConnection() {
private suspend fun AblyRealtime.closeSuspending() {
if (connection.state.isClosed() || connection.state.isFailed()) {
return
}
suspendCancellableCoroutine<Unit> { continuation ->
ably.connection.on {
if (it.current == ConnectionState.closed) {
connection.on {
if (it.current.isClosed()) {
continuation.resume(Unit)
} else if (it.current == ConnectionState.failed) {
} else if (it.current.isFailed()) {
continuation.resumeWithException(it.reason.toTrackingException())
}
}
ably.close()
close()
}
}

Expand Down Expand Up @@ -791,6 +809,12 @@ constructor(

private fun ChannelState.isConnected(): Boolean = this == ChannelState.attached

private fun ConnectionState.isConnected(): Boolean = this == ConnectionState.connected

private fun ConnectionState.isClosed(): Boolean = this == ConnectionState.closed

private fun ConnectionState.isFailed(): Boolean = this == ConnectionState.failed

private fun ConnectionException.isConnectionResumeException(): Boolean =
errorInformation.let { it.message == "Connection resume failed" && it.code == 50000 && it.statusCode == 500 }

Expand Down Expand Up @@ -845,4 +869,54 @@ constructor(
state == ChannelState.detached || state == ChannelState.failed

private fun createMalformedMessageErrorInfo(): ErrorInfo = ErrorInfo("Received a malformed message", 100_001, 400)

override suspend fun startConnection(): Result<Unit> {
return try {
ably.connectSuspending()
Result.success(Unit)
} catch (connectionException: ConnectionException) {
Result.failure(connectionException)
}
}

override suspend fun stopConnection(): Result<Unit> {
return try {
ably.closeSuspending()
Result.success(Unit)
} catch (connectionException: ConnectionException) {
Result.failure(connectionException)
}
}

/**
* A suspending version of the [AblyRealtime.connect] method. It will begin connecting and wait until it's connected.
* If the connection enters the "failed" state it will throw a [ConnectionException].
* If the operation doesn't complete in [timeoutInMilliseconds] it will throw a [ConnectionException].
* If the instance is already connected it will finish immediately.
* If the connection is already failed it throws a [ConnectionException].
*
* @throws ConnectionException if something goes wrong.
*/
private suspend fun AblyRealtime.connectSuspending(timeoutInMilliseconds: Long = 10_000L) {
if (connection.state.isConnected()) {
return
} else if (connection.state.isFailed()) {
throw connection.reason.toTrackingException()
}
try {
withTimeout(timeoutInMilliseconds) {
suspendCancellableCoroutine<Unit> { continuation ->
connection.on { channelStateChange ->
when {
channelStateChange.current.isConnected() -> continuation.resume(Unit)
channelStateChange.current.isFailed() -> continuation.resumeWithException(channelStateChange.reason.toTrackingException())
}
}
connect()
}
}
} catch (exception: TimeoutCancellationException) {
throw ConnectionException(ErrorInformation("Timeout was thrown when waiting for Ably to connect"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,6 @@ constructor(
areRawLocationsEnabled: Boolean?,
) : PublisherProperties {
private var isDisposed: Boolean = false
override var isStopped: Boolean = false
override var locationEngineResolution: Resolution = locationEngineResolution
get() = if (isDisposed) throw PublisherPropertiesDisposedException() else field
override val isLocationEngineResolutionConstant: Boolean = isLocationEngineResolutionConstant
Expand Down Expand Up @@ -752,6 +751,16 @@ constructor(
override val trackableRemovalGuard: TrackableRemovalGuard = DefaultTrackableRemovalGuard()
get() = if (isDisposed) throw PublisherPropertiesDisposedException() else field
override val areRawLocationsEnabled: Boolean = areRawLocationsEnabled ?: false
override var state: PublisherState = PublisherState.IDLE
set(value) {
// Once we stop publisher it should never change its state
if (field == PublisherState.STOPPED) {
throw PublisherStoppedException()
}
field = value
}
override val hasNoTrackablesAddingOrAdded: Boolean
get() = trackables.isEmpty() && !duplicateTrackableGuard.isCurrentlyAddingAnyTrackable()

override fun dispose() {
trackables.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ internal interface PublisherProperties {
val requests: MutableMap<String, MutableMap<Subscriber, Resolution>>
val resolutions: MutableMap<String, Resolution>
var active: Trackable?
var isStopped: Boolean
var locationEngineResolution: Resolution
val isLocationEngineResolutionConstant: Boolean
var isTracking: Boolean
Expand All @@ -44,5 +43,7 @@ internal interface PublisherProperties {
val enhancedLocationsPublishingState: LocationsPublishingState<EnhancedLocationUpdate>
val rawLocationsPublishingState: LocationsPublishingState<LocationUpdate>
val areRawLocationsEnabled: Boolean
val hasNoTrackablesAddingOrAdded: Boolean
var state: PublisherState
fun dispose()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.ably.tracking.publisher

enum class PublisherState {
/**
* The publisher is created but has no connection to Ably.
*/
IDLE,

/**
* The publisher is trying to connect to Ably.
*/
CONNECTING,

/**
* The publisher is connected to Ably.
*/
CONNECTED,

/**
* The publisher is trying to disconnect from Ably.
*/
DISCONNECTING,

/**
* The publisher is stopped and should not be used anymore.
*/
STOPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ internal interface DuplicateTrackableGuard {
fun startAddingTrackable(trackable: Trackable)
fun finishAddingTrackable(trackable: Trackable, result: Result<AddTrackableResult>)
fun isCurrentlyAddingTrackable(trackable: Trackable): Boolean
fun isCurrentlyAddingAnyTrackable(): Boolean
fun saveDuplicateAddHandler(trackable: Trackable, callbackFunction: AddTrackableCallbackFunction)
fun clear(trackable: Trackable)
fun clearAll()
Expand Down Expand Up @@ -63,6 +64,15 @@ internal class DefaultDuplicateTrackableGuard : DuplicateTrackableGuard {
return trackablesCurrentlyBeingAdded.contains(trackable)
}

/**
* Checks if the adding process for any trackable is already ongoing.
*
* @return True if any trackable is currently being added, false otherwise.
*/
override fun isCurrentlyAddingAnyTrackable(): Boolean {
return trackablesCurrentlyBeingAdded.isNotEmpty()
}

/**
* Saves the handler from a duplicate add call for a specified trackable.
* This handler will be called when the original trackable adding process will finish in [finishAddingTrackable].
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ably.tracking.publisher.workerqueue

import com.ably.tracking.publisher.DefaultCorePublisher
import com.ably.tracking.publisher.PublisherState
import com.ably.tracking.publisher.PublisherStoppedException
import com.ably.tracking.publisher.workerqueue.resulthandlers.getWorkResultHandler
import com.ably.tracking.publisher.workerqueue.results.SyncAsyncResult
Expand All @@ -27,7 +28,7 @@ internal class DefaultWorkerQueue(

private suspend fun executeWorkers() {
for (worker in workerChannel) {
if (publisherProperties.isStopped) {
if (publisherProperties.state == PublisherState.STOPPED) {
worker.doWhenStopped(PublisherStoppedException())
} else {
execute(worker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.ably.tracking.publisher.workerqueue.workers.AddTrackableWorker
import com.ably.tracking.publisher.workerqueue.workers.ChangeLocationEngineResolutionWorker
import com.ably.tracking.publisher.workerqueue.workers.ChangeRoutingProfileWorker
import com.ably.tracking.publisher.workerqueue.workers.ChannelConnectionStateChangeWorker
import com.ably.tracking.publisher.workerqueue.workers.StoppingConnectionFinishedWorker
import com.ably.tracking.publisher.workerqueue.workers.ConnectionCreatedWorker
import com.ably.tracking.publisher.workerqueue.workers.ConnectionReadyWorker
import com.ably.tracking.publisher.workerqueue.workers.DestinationSetWorker
Expand Down Expand Up @@ -78,6 +79,8 @@ internal class DefaultWorkerFactory(
params.trackable,
params.callbackFunction,
params.exception,
params.isConnectedToAbly,
ably,
)
is WorkerParams.ConnectionCreated -> ConnectionCreatedWorker(
params.trackable,
Expand Down Expand Up @@ -112,10 +115,12 @@ internal class DefaultWorkerFactory(
params.callbackFunction,
corePublisher,
params.shouldRecalculateResolutionCallback,
ably,
)
is WorkerParams.TrackableRemovalRequested -> TrackableRemovalRequestedWorker(
params.trackable,
params.callbackFunction,
ably,
params.result,
)
is WorkerParams.AblyConnectionStateChange -> AblyConnectionStateChangeWorker(
Expand Down Expand Up @@ -204,6 +209,7 @@ internal class DefaultWorkerFactory(
corePublisher,
params.timeoutInMilliseconds,
)
WorkerParams.StoppingConnectionFinished -> StoppingConnectionFinishedWorker()
}
}
}
Expand All @@ -224,6 +230,7 @@ internal sealed class WorkerParams {
val trackable: Trackable,
val callbackFunction: ResultCallbackFunction<StateFlow<TrackableState>>,
val exception: Exception,
val isConnectedToAbly: Boolean,
) : WorkerParams()

object ChangeLocationEngineResolution : WorkerParams()
Expand Down Expand Up @@ -330,4 +337,6 @@ internal sealed class WorkerParams {
val callbackFunction: ResultCallbackFunction<StateFlow<TrackableState>>,
val result: Result<Unit>,
) : WorkerParams()

object StoppingConnectionFinished : WorkerParams()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ably.tracking.publisher.workerqueue.resulthandlers

import com.ably.tracking.publisher.workerqueue.WorkerFactory
import com.ably.tracking.publisher.workerqueue.WorkerParams
import com.ably.tracking.publisher.workerqueue.results.AddTrackableFailedWorkResult
import com.ably.tracking.publisher.workerqueue.workers.Worker

internal class AddTrackableFailedResultHandler(
private val workerFactory: WorkerFactory
) : WorkResultHandler<AddTrackableFailedWorkResult> {
override fun handle(workResult: AddTrackableFailedWorkResult): Worker? =
when (workResult) {
AddTrackableFailedWorkResult.StopConnectionCompleted ->
workerFactory.createWorker(WorkerParams.StoppingConnectionFinished)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ internal class AddTrackableResultHandler(
is AddTrackableWorkResult.Fail ->
return workerFactory.createWorker(
WorkerParams.AddTrackableFailed(
workResult.trackable, workResult.callbackFunction, workResult.exception as Exception
workResult.trackable, workResult.callbackFunction, workResult.exception as Exception, workResult.isConnectedToAbly
)
)

Expand All @@ -31,6 +31,16 @@ internal class AddTrackableResultHandler(
workResult.channelStateChangeListener
)
)

is AddTrackableWorkResult.WorkDelayed ->
return workerFactory.createWorker(
WorkerParams.AddTrackable(
workResult.trackable,
workResult.callbackFunction,
workResult.presenceUpdateListener,
workResult.channelStateChangeListener,
)
)
}
return null
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ably.tracking.publisher.workerqueue.resulthandlers

import com.ably.tracking.publisher.workerqueue.WorkerFactory
import com.ably.tracking.publisher.workerqueue.WorkerParams
import com.ably.tracking.publisher.workerqueue.results.DisconnectSuccessWorkResult
import com.ably.tracking.publisher.workerqueue.workers.Worker

internal class DisconnectSuccessResultHandler(
private val workerFactory: WorkerFactory
) : WorkResultHandler<DisconnectSuccessWorkResult> {
override fun handle(workResult: DisconnectSuccessWorkResult): Worker? =
when (workResult) {
DisconnectSuccessWorkResult.StopConnectionCompleted ->
workerFactory.createWorker(WorkerParams.StoppingConnectionFinished)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ably.tracking.publisher.workerqueue.resulthandlers

import com.ably.tracking.publisher.workerqueue.WorkerFactory
import com.ably.tracking.publisher.workerqueue.WorkerParams
import com.ably.tracking.publisher.workerqueue.results.TrackableRemovalRequestedWorkResult
import com.ably.tracking.publisher.workerqueue.workers.Worker

internal class TrackableRemovalRequestedResultHandler(
private val workerFactory: WorkerFactory
) : WorkResultHandler<TrackableRemovalRequestedWorkResult> {
override fun handle(workResult: TrackableRemovalRequestedWorkResult): Worker? =
when (workResult) {
TrackableRemovalRequestedWorkResult.StopConnectionCompleted ->
workerFactory.createWorker(WorkerParams.StoppingConnectionFinished)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package com.ably.tracking.publisher.workerqueue.resulthandlers

import com.ably.tracking.publisher.workerqueue.WorkerFactory
import com.ably.tracking.publisher.workerqueue.WorkerQueue
import com.ably.tracking.publisher.workerqueue.results.AddTrackableFailedWorkResult
import com.ably.tracking.publisher.workerqueue.results.AddTrackableWorkResult
import com.ably.tracking.publisher.workerqueue.results.ConnectionCreatedWorkResult
import com.ably.tracking.publisher.workerqueue.results.ConnectionReadyWorkResult
import com.ably.tracking.publisher.workerqueue.results.DisconnectSuccessWorkResult
import com.ably.tracking.publisher.workerqueue.results.RemoveTrackableWorkResult
import com.ably.tracking.publisher.workerqueue.results.RetrySubscribeToPresenceWorkResult
import com.ably.tracking.publisher.workerqueue.results.TrackableRemovalRequestedWorkResult
import com.ably.tracking.publisher.workerqueue.results.WorkResult

@Suppress("UNCHECKED_CAST")
Expand All @@ -21,5 +24,8 @@ internal fun getWorkResultHandler(
is ConnectionReadyWorkResult -> ConnectionReadyResultHandler(workerFactory) as WorkResultHandler<WorkResult>
is RemoveTrackableWorkResult -> RemoveTrackableResultHandler(workerFactory, workerQueue) as WorkResultHandler<WorkResult>
is RetrySubscribeToPresenceWorkResult -> RetrySubscribeToPresenceResultHandler(workerFactory) as WorkResultHandler<WorkResult>
is TrackableRemovalRequestedWorkResult -> TrackableRemovalRequestedResultHandler(workerFactory) as WorkResultHandler<WorkResult>
is AddTrackableFailedWorkResult -> AddTrackableFailedResultHandler(workerFactory) as WorkResultHandler<WorkResult>
is DisconnectSuccessWorkResult -> DisconnectSuccessResultHandler(workerFactory) as WorkResultHandler<WorkResult>
else -> throw IllegalArgumentException("Invalid workResult provided")
}
Loading

0 comments on commit 3b0c239

Please sign in to comment.