Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseGateway abstraction #798

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
47 changes: 40 additions & 7 deletions gateway/api/gateway.api
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,43 @@ public final class dev/kord/gateway/AutoModerationRuleUpdate : dev/kord/gateway/
public fun toString ()Ljava/lang/String;
}

public abstract class dev/kord/gateway/BaseGateway : dev/kord/gateway/Gateway {
public fun <init> ()V
public fun detach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
protected abstract fun getDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher;
public fun getEvents ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public synthetic fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
protected final fun getLog ()Lmu/KLogger;
protected final fun getState ()Ldev/kord/gateway/BaseGateway$State;
protected abstract fun onDetach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onSend (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onStart (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected abstract fun onStop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun send (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected final fun setState (Ldev/kord/gateway/BaseGateway$State;)V
public fun start (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun stop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
protected final fun throwStateError ()Ljava/lang/Void;
}

protected abstract class dev/kord/gateway/BaseGateway$State {
public synthetic fun <init> (ZLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun getRetry ()Z
}

public final class dev/kord/gateway/BaseGateway$State$Detached : dev/kord/gateway/BaseGateway$State {
public static final field INSTANCE Ldev/kord/gateway/BaseGateway$State$Detached;
}

public final class dev/kord/gateway/BaseGateway$State$Running : dev/kord/gateway/BaseGateway$State {
public fun <init> (Z)V
}

public final class dev/kord/gateway/BaseGateway$State$Stopped : dev/kord/gateway/BaseGateway$State {
public static final field INSTANCE Ldev/kord/gateway/BaseGateway$State$Stopped;
}

public final class dev/kord/gateway/ChannelCreate : dev/kord/gateway/DispatchEvent {
public fun <init> (Ldev/kord/common/entity/DiscordChannel;Ljava/lang/Integer;)V
public final fun component1 ()Ldev/kord/common/entity/DiscordChannel;
Expand Down Expand Up @@ -219,16 +256,12 @@ public final class dev/kord/gateway/Command$SerializationStrategy : kotlinx/seri
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
}

public final class dev/kord/gateway/DefaultGateway : dev/kord/gateway/Gateway {
public final class dev/kord/gateway/DefaultGateway : dev/kord/gateway/BaseGateway {
public static final field Companion Ldev/kord/gateway/DefaultGateway$Companion;
public fun <init> (Ldev/kord/gateway/DefaultGatewayData;)V
public fun detach (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public fun getEvents ()Lkotlinx/coroutines/flow/MutableSharedFlow;
public synthetic fun getEvents ()Lkotlinx/coroutines/flow/SharedFlow;
public fun getPing ()Lkotlinx/coroutines/flow/StateFlow;
public fun send (Ldev/kord/gateway/Command;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun start (Ldev/kord/gateway/GatewayConfiguration;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun stop (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class dev/kord/gateway/DefaultGateway$Companion {
Expand Down
142 changes: 142 additions & 0 deletions gateway/src/main/kotlin/BaseGateway.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package dev.kord.gateway

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.update
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.CoroutineContext

/**
* Base abstraction for a gateway implementation.
*/
public abstract class BaseGateway : Gateway {

/**
* The logger for this gateway.
*/
protected val log: KLogger = KotlinLogging.logger { }

/**
* The current state of this gateway as atomic reference.
* @see State
*/
private val atomicState: AtomicRef<State> = atomic(State.Stopped)

/**
* The current state of this gateway.
* To change it use the [atomicState] reference.
* @see State
*/
protected var state: State by atomicState

/**
* The dispatcher used to run the gateway.
* It will be used to assemble the [coroutineContext] of this gateway.
* By default, there is [SupervisorJob] before the dispatcher in [coroutineContext].
*/
protected abstract val dispatcher: CoroutineDispatcher

override val coroutineContext: CoroutineContext get() = SupervisorJob() + dispatcher

override val events: MutableSharedFlow<Event> = MutableSharedFlow()

override suspend fun start(configuration: GatewayConfiguration) {
requireState<State.Stopped>()
atomicState.update { State.Running(true) }
onStart(configuration)
}

/**
* This method is called just after the [start] method,
* once the state is updated to [State.Running].
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onStart(configuration: GatewayConfiguration)

override suspend fun stop() {
requireStateIsNot<State.Detached>()
events.emit(Close.UserClose)
atomicState.update { State.Stopped }
onStop()
}

/**
* This method is called just after the [stop] method,
* once the [Close.UserClose] event is emitted,
* and the state is updated to [State.Stopped].
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onStop()

override suspend fun detach() {
(this as CoroutineScope).cancel()
if (state is State.Detached) return
atomicState.update { State.Detached }
events.emit(Close.Detach)
onDetach()
}

/**
* This method is called just after the [detach] method,
* once the state is updated to [State.Detached],
* and the [Close.Detach] event is emitted.
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onDetach()

override suspend fun send(command: Command) {
requireStateIsNot<State.Detached>()
onSend(command)
}

/**
* This method is called just after the [send] method.
* The state is ensured to be valid before this method is called.
*/
protected abstract suspend fun onSend(command: Command)

/**
* Checks whether the current [state] is not of type [T].
* If it is, an [IllegalStateException] is thrown with a describing message.
*/
protected inline fun <reified T : State> requireStateIsNot() {
if (state !is T) return
throwStateError()
}

/**
* Checks whether the current [state] is of type [T].
* If it isn't, an [IllegalStateException] is thrown with a describing message.
*/
protected inline fun <reified T : State> requireState() {
if (state is T) return
throwStateError()
}

/**
* Throws an [IllegalStateException] with a describing message based on the current [state].
*/
protected fun throwStateError(): Nothing {
when (state) {
is State.Stopped -> error("The gateway is already stopped.")
is State.Running -> error("The gateway is already running, call stop() first.")
is State.Detached -> error("The Gateway has been detached and can no longer be used, create a new instance instead.")
}
}

/**
* Represents the current state of the gateway.
* @param retry whether the gateway should attempt to reconnect when it stops.
*/
protected sealed class State(public val retry: Boolean) {
public object Stopped : State(false)
public class Running(retry: Boolean) : State(retry)
public object Detached : State(false)
}
}
Loading