From 1c72d4fa767c9bf66473c2c4c44efa4d26c2bd16 Mon Sep 17 00:00:00 2001 From: Ido Flax Date: Sun, 13 Oct 2024 21:48:25 +0200 Subject: [PATCH 1/4] feat(kotlin-v2-ktor-v3): upgraded to - kotlin 2.0.20 - ktor 3.0.0 - other libs upgraded accordingly - removed usage of context receivers Changes to main and test code accordingly --- .editorconfig | 6 +- .../io/github/flaxoos/ktor/Conventions.kt | 28 +-- .../github/flaxoos/ktor/extensions/Targets.kt | 28 +-- common/build.gradle.kts | 11 +- gradle.properties | 3 +- gradle/libs.versions.toml | 23 ++- .../plugins/circuitbreaker/CircuitBreaker.kt | 2 +- .../CircuitBreakerInitializers.kt | 8 +- .../circuitbreaker/CircuitBreakerPlugin.kt | 33 +-- .../circuitbreaker/CircuitBreakerTest.kt | 90 ++++---- ktor-server-kafka/build.gradle.kts | 2 +- .../ktor/server/plugins/kafka/Commons.kt | 4 +- .../ktor/server/plugins/kafka/KafkaPlugin.kt | 90 ++++---- .../server/plugins/kafka/KafkaPluginConfig.kt | 162 +++++++++------ .../plugins/kafka/components/AdminClient.kt | 2 +- .../server/plugins/kafka/components/Avro.kt | 6 +- .../plugins/kafka/components/Consumer.kt | 4 +- .../plugins/kafka/BaseKafkaIntegrationTest.kt | 6 +- .../plugins/kafka/KtorKafkaIntegrationTest.kt | 8 +- .../server/plugins/ratelimiter/RateLimiter.kt | 4 +- .../ratelimiter/RateLimitingConfiguration.kt | 97 ++++----- .../plugins/ratelimiter/RateLimitingPlugin.kt | 67 +++--- .../ratelimiter/implementations/Bucket.kt | 10 +- .../implementations/LeakyBucket.kt | 6 +- .../implementations/SlidingWindow.kt | 6 +- .../implementations/TokenBucket.kt | 4 +- .../ratelimiter/RateLimitingPluginTest.kt | 194 +++++++++--------- .../taskscheduling/TaskSchedulerPlugin.kt | 8 +- .../TaskSchedulingConfiguration.kt | 6 +- .../taskscheduling/managers/TaskManager.kt | 2 +- .../managers/lock/TaskLockManager.kt | 2 +- .../lock/database/DatabaseTaskLockManager.kt | 6 +- .../plugins/taskscheduling/tasks/Tasks.kt | 2 +- .../test/build.gradle.kts | 2 +- .../TaskSchedulingPluginTest.kt | 115 ++++++----- .../build.gradle.kts | 2 +- .../managers/lock/database/JdbcLockManager.kt | 26 +-- .../taskscheduling/JdbcLockManagerTest.kt | 2 +- .../build.gradle.kts | 2 +- .../lock/database/MongoDBLockManager.kt | 34 +-- .../build.gradle.kts | 2 +- .../managers/lock/redis/RedisLockManager.kt | 14 +- 42 files changed, 599 insertions(+), 530 deletions(-) rename ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/{ => io/github/flaxoos/ktor/server/plugins}/taskscheduling/TaskSchedulingPluginTest.kt (51%) diff --git a/.editorconfig b/.editorconfig index 6755098c..512bdeb8 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,2 +1,4 @@ -[*.{kt,kts}] -ktlint_code_style = ktlint_official \ No newline at end of file +[*.{kt,kts,gradle}] +ktlint_code_style = ktlint_official +ij_kotlin_allow_trailing_comma_on_call_site = true +ij_kotlin_allow_trailing_comma = true \ No newline at end of file diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt index 1cb56c8f..d15f502a 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt @@ -28,13 +28,12 @@ import org.gradle.kotlin.dsl.withGroovyBuilder import org.gradle.kotlin.dsl.withType import org.gradle.plugins.ide.idea.model.IdeaModel import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension -import org.jetbrains.kotlin.gradle.kpm.external.ExternalVariantApi -import org.jetbrains.kotlin.gradle.kpm.external.project import org.jetbrains.kotlin.gradle.plugin.KotlinDependencyHandler import org.jetbrains.kotlin.gradle.plugin.KotlinSourceSet open class Conventions : Plugin { - open fun KotlinMultiplatformExtension.conventionSpecifics() {} + open fun KotlinMultiplatformExtension.conventionSpecifics(project: Project) {} + override fun apply(project: Project) { with(project) { with(plugins) { @@ -61,7 +60,7 @@ open class Conventions : Plugin { enableContextReceivers() extensions.findByType(KotlinMultiplatformExtension::class)?.apply { - targetJvm() + targetJvm(project) this.sourceSets.apply { commonMainDependencies { implementation(library("kotlinx-datetime")) @@ -69,7 +68,6 @@ open class Conventions : Plugin { implementation(library("arrow-core")) implementation(library("arrow-fx-coroutines")) implementation(library("kotlin-logging")) - } commonTestDependencies { @@ -90,7 +88,7 @@ open class Conventions : Plugin { implementation(library("mockk-agent-jvm")) } } - this.conventionSpecifics() + this.conventionSpecifics(project) } setLanguageAndApiVersions() @@ -149,7 +147,7 @@ open class Conventions : Plugin { } extensions.findByType(AtomicFUPluginExtension::class)?.apply { - dependenciesVersion = versionOf("atomicFu") + dependenciesVersion = versionOf("atomicfu") transformJvm = true jvmVariant = "FU" } @@ -164,14 +162,10 @@ open class Conventions : Plugin { configurePublishing() } } - - } class KtorServerPluginConventions : Conventions() { - - @OptIn(ExternalVariantApi::class) - override fun KotlinMultiplatformExtension.conventionSpecifics() { + override fun KotlinMultiplatformExtension.conventionSpecifics(project: Project) { sourceSets.apply { commonMainDependencies { implementation(project.library("ktor-server-core")) @@ -187,10 +181,8 @@ class KtorServerPluginConventions : Conventions() { } class KtorClientPluginConventions : Conventions() { - - @OptIn(ExternalVariantApi::class) - override fun KotlinMultiplatformExtension.conventionSpecifics() { - with(this.project) { + override fun KotlinMultiplatformExtension.conventionSpecifics(project: Project) { + with(project) { sourceSets.apply { commonMainDependencies { implementation(library("ktor-client-core")) @@ -247,9 +239,7 @@ fun NamedDomainObjectCollection.nativeTestDependencies(configur private fun Project.ktorVersion() = versionOf("ktor") - -fun Project.projectDependencies(configuration: DependencyHandlerScope.() -> Unit) = - DependencyHandlerScope.of(dependencies).configuration() +fun Project.projectDependencies(configuration: DependencyHandlerScope.() -> Unit) = DependencyHandlerScope.of(dependencies).configuration() @Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER) @Retention(AnnotationRetention.RUNTIME) diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt index d2e32967..7e463107 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt @@ -1,5 +1,6 @@ package io.github.flaxoos.ktor.extensions +import org.gradle.api.Project import org.gradle.api.file.DuplicatesStrategy import org.gradle.jvm.tasks.Jar import org.gradle.kotlin.dsl.get @@ -7,19 +8,7 @@ import org.gradle.kotlin.dsl.named import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTargetWithHostTests -fun KotlinMultiplatformExtension.targetNative( - configure: KotlinNativeTargetWithHostTests.() -> Unit = {}, -// hardTarget: (KotlinMultiplatformExtension.() -> KotlinNativeTargetWithHostTests)? = null -) { -// val hostOs = System.getProperty("os.name") -// val arch = System.getProperty("os.arch") -// val nativeTarget = hardTarget?.let { this.hardTarget() } ?: when { -// hostOs == "Mac OS X" && arch == "x86_64" -> macosX64("native") -// hostOs == "Mac OS X" && arch == "aarch64" -> macosArm64("native") -// hostOs == "Linux" -> linuxX64("native") -// // Other supported targets are listed here: https://ktor.io/docs/native-server.html#targets -// else -> throw GradleException("Host OS is not supported in Kotlin/Native.") -// } +fun KotlinMultiplatformExtension.targetNative(configure: KotlinNativeTargetWithHostTests.() -> Unit = {}) { val nativeTarget = linuxX64("native") nativeTarget.apply { binaries.sharedLib() @@ -27,10 +16,9 @@ fun KotlinMultiplatformExtension.targetNative( } } - -fun KotlinMultiplatformExtension.targetJvm() { +fun KotlinMultiplatformExtension.targetJvm(project: Project) { + jvmToolchain(project.versionOf("java").toInt()) jvm { - jvmToolchain(project.versionOf("java").toInt()) project.tasks.named("jvmJar", Jar::class).configure { duplicatesStrategy = DuplicatesStrategy.EXCLUDE from( @@ -39,12 +27,14 @@ fun KotlinMultiplatformExtension.targetJvm() { project.configurations["jvmRuntimeClasspath"], ).map { config -> config.map { - if (it.isDirectory) it - else project.zipTree(it) + if (it.isDirectory) { + it + } else { + project.zipTree(it) + } } }, ) } } } - diff --git a/common/build.gradle.kts b/common/build.gradle.kts index f312e523..bc20b7ad 100644 --- a/common/build.gradle.kts +++ b/common/build.gradle.kts @@ -1,7 +1,4 @@ -import io.github.flaxoos.ktor.extensions.configureMavenPublications import io.github.flaxoos.ktor.extensions.configurePublishing -import io.github.flaxoos.ktor.extensions.configureSigning -import io.github.flaxoos.ktor.extensions.registerDokkaJarTask import io.github.flaxoos.ktor.extensions.targetJvm import io.github.flaxoos.ktor.extensions.targetNative @@ -9,11 +6,15 @@ plugins { kotlin("multiplatform") `maven-publish` id("signing") - id(libs.plugins.dokka.get().pluginId) + id( + libs.plugins.dokka + .get() + .pluginId, + ) } kotlin { - targetJvm() + targetJvm(project) targetNative() macosArm64("native-macos") { binaries { diff --git a/gradle.properties b/gradle.properties index 1f141628..2b7b5a7a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,4 +5,5 @@ kotlin.native.ignoreDisabledTargets=true gradle.publish.enable.module-metadata=true version=1.2.10 gpr.user=flaxoos -org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=2g \ No newline at end of file +org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=2g +kotlin.mpp.applyDefaultHierarchyTemplate=false \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7f189697..7d435a2d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,14 +1,14 @@ [versions] # Core technologies -kotlin = "1.9.10" +kotlin = "2.0.20" java = "11" shadow = "8.1.1" -kotlinx-serialization = "1.6.0" +kotlinx-serialization = "1.7.3" kotlinx-io = "0.3.0" ksp = "1.9.10-1.0.13" # Web Framework -ktor = "2.3.2" +ktor = "3.0.0" # Android android = "8.1.0" @@ -25,13 +25,14 @@ kotlin-logging = "5.1.0" logging_capabilities = "0.11.1" # Asynchronous and Concurrency -atomicFu = "0.21.0" -kotlinx-coroutines = "1.7.1" +atomicfu = "0.25.0" +kotlinx-coroutines = "1.9.0" # Testing -kotest = "5.6.2" +kotest = "6.0.0.M1" +kotest-stable = "5.9.1" kotest-test-containers = "2.0.2" -mockk = "1.11.0" +mockk = "1.13.4" mockative = "2.0.1" kmock = "0.3.0-rc08" testcontainers = "1.19.8" @@ -122,8 +123,8 @@ mongodb-bson-kotlinx = { module = "org.mongodb:bson-kotlinx", version.ref = "mon # Testing libraries kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" } -kotest-framework-datatest = { module = "io.kotest:kotest-framework-datatest", version.ref = "kotest" } -kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" } +kotest-framework-datatest = { module = "io.kotest:kotest-framework-datatest", version.ref = "kotest-stable" } +kotest-assertions-core = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest-stable" } kotest-framework-engine = { module = "io.kotest:kotest-framework-engine", version.ref = "kotest" } mockk = { module = "io.mockk:mockk", version.ref = "mockk" } mockk-agent-jvm = { module = "io.mockk:mockk-agent-jvm", version.ref = "mockk" } @@ -153,7 +154,7 @@ kover-badge-gradlePlugin = { module = "io.github.flaxoos:kover-badge", version.r detekt-gradlePlugin = { module = "io.gitlab.arturbosch.detekt:detekt-gradle-plugin", version.ref = "detekt" } # Asynchronous and Concurrency plugins -atomicfu-gradlePlugin = { module = "org.jetbrains.kotlinx:atomicfu-gradle-plugin", version.ref = "atomicFu" } +atomicfu-gradlePlugin = { module = "org.jetbrains.kotlinx:atomicfu-gradle-plugin", version.ref = "atomicfu" } # Logging logback-core = { module = "ch.qos.logback:logback-core", version.ref = "logback" } @@ -207,7 +208,7 @@ ktlint = { id = "org.jlleitschuh.gradle.ktlint", version.ref = "ktlint" } dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" } # Asynchronous and Concurrency plugins -atomicfu = { id = "kotlinx-atomicfu", version.ref = "atomicFu" } +atomicfu = { id = "org.jetbrains.kotlinx.atomicfu", version.ref = "atomicfu" } # Testing kotest = { id = "io.kotest.multiplatform", version.ref = "kotest" } diff --git a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt index 569d848a..65605ba8 100644 --- a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt +++ b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreaker.kt @@ -20,7 +20,7 @@ private val logger = KtorSimpleLogger("io.github.flaxoos.ktor.client.plugins.Cir internal class CircuitBreaker( private val name: CircuitBreakerName, - config: CircuitBreakerConfig.CircuitBreakerBuilder + config: CircuitBreakerConfig.CircuitBreakerBuilder, ) { private val failureThreshold: Int = config.failureThreshold private val halfOpenFailureThreshold: Int = config.halfOpenFailureThreshold diff --git a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerInitializers.kt b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerInitializers.kt index 43b2e717..6638ab2b 100644 --- a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerInitializers.kt +++ b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerInitializers.kt @@ -12,7 +12,7 @@ import io.ktor.util.collections.ConcurrentMap @CircuitBreakerDsl fun CircuitBreakerConfig.register( name: CircuitBreakerName, - config: CircuitBreakerConfig.CircuitBreakerBuilder.() -> Unit + config: CircuitBreakerConfig.CircuitBreakerBuilder.() -> Unit, ) { circuitBreakers.addCircuitBreaker(name, config) } @@ -24,7 +24,7 @@ fun CircuitBreakerConfig.register( fun CircuitBreakerConfig.global(config: CircuitBreakerConfig.CircuitBreakerBuilder.() -> Unit) { global = CircuitBreaker( CIRCUIT_BREAKER_NAME_GLOBAL, - CircuitBreakerConfig.CircuitBreakerBuilder().apply(config) + CircuitBreakerConfig.CircuitBreakerBuilder().apply(config), ) } @@ -42,7 +42,7 @@ fun HttpRequestBuilder.withCircuitBreaker(name: CircuitBreakerName = CIRCUIT_BRE */ suspend fun HttpClient.requestWithCircuitBreaker( name: CircuitBreakerName = CIRCUIT_BREAKER_NAME_GLOBAL, - block: HttpRequestBuilder.() -> Unit + block: HttpRequestBuilder.() -> Unit, ): HttpResponse { return request { withCircuitBreaker(name) @@ -55,7 +55,7 @@ suspend fun HttpClient.requestWithCircuitBreaker( */ internal fun ConcurrentMap.addCircuitBreaker( name: CircuitBreakerName, - config: CircuitBreakerConfig.CircuitBreakerBuilder.() -> Unit + config: CircuitBreakerConfig.CircuitBreakerBuilder.() -> Unit, ) { require(!containsKey(name)) { "Circuit Breaker with name $name is already registered" diff --git a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerPlugin.kt b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerPlugin.kt index 7698befd..6d7d27f7 100644 --- a/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerPlugin.kt +++ b/ktor-client-circuit-breaker/src/commonMain/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerPlugin.kt @@ -14,24 +14,25 @@ internal val CircuitBreakerInstancesRegistryKey = internal val CircuitBreakerNameKey = AttributeKey("CircuitBreakerInstancesRegistryKey") -val CircuitBreaking = createClientPlugin("CircuitBreaker", ::CircuitBreakerConfig) { - val global = pluginConfig.global - val instances = pluginConfig.circuitBreakers.apply { - if (global != null) { - put(CIRCUIT_BREAKER_NAME_GLOBAL, global) - } +val CircuitBreaking = + createClientPlugin("CircuitBreaker", ::CircuitBreakerConfig) { + val global = pluginConfig.global + val instances = + pluginConfig.circuitBreakers.apply { + if (global != null) { + put(CIRCUIT_BREAKER_NAME_GLOBAL, global) + } + } + require(instances.isNotEmpty()) { "At least one circuit breaker must be specified" } + client.circuitBreakerRegistry().putAll( + instances.mapValues { entry -> + entry.value.apply { initialize(client.engine.dispatcher) } + }, + ) + circuitBreakerPluginBuilder() } - require(instances.isNotEmpty()) { "At least one circuit breaker must be specified" } - client.circuitBreakerRegistry().putAll( - instances.mapValues { entry -> - entry.value.apply { initialize(client.engine.dispatcher) } - } - ) - circuitBreakerPluginBuilder() -} -internal fun HttpClient.circuitBreakerRegistry() = - this.attributes.computeIfAbsent(CircuitBreakerInstancesRegistryKey) { ConcurrentMap() } +internal fun HttpClient.circuitBreakerRegistry() = this.attributes.computeIfAbsent(CircuitBreakerInstancesRegistryKey) { ConcurrentMap() } internal fun ClientPluginBuilder.circuitBreakerPluginBuilder() { val instanceRegistry = client.circuitBreakerRegistry() diff --git a/ktor-client-circuit-breaker/src/commonTest/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt b/ktor-client-circuit-breaker/src/commonTest/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt index 4a55ec2a..aad80944 100644 --- a/ktor-client-circuit-breaker/src/commonTest/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt +++ b/ktor-client-circuit-breaker/src/commonTest/kotlin/io/github/flaxoos/ktor/client/plugins/circuitbreaker/CircuitBreakerTest.kt @@ -33,26 +33,25 @@ private const val CONCURRENCY_COUNT = 2 @OptIn(ExperimentalCoroutinesApi::class, ExperimentalStdlibApi::class, ExperimentalKotest::class) class CircuitBreakerTest : FunSpec() { - sealed class CircuitBreakerTestCase( val name: CircuitBreakerName, val failureThreshold: Int, val halfOpenFailureThreshold: Int, - val resetInterval: Duration + val resetInterval: Duration, ) data object Global : CircuitBreakerTestCase( name = CIRCUIT_BREAKER_NAME_GLOBAL, failureThreshold = 6, halfOpenFailureThreshold = 4, - resetInterval = 2.seconds + resetInterval = 2.seconds, ) data object Specific : CircuitBreakerTestCase( name = TestCircuitBreakerName, failureThreshold = 3, halfOpenFailureThreshold = 2, - resetInterval = 1.seconds + resetInterval = 1.seconds, ) private var client by Delegates.notNull() @@ -157,22 +156,26 @@ class CircuitBreakerTest : FunSpec() { repeat(case.failureThreshold + 1) { getWithCircuitBreaker(case.name) } - fun call(wait: Duration = 0.milliseconds) = async { - try { - delay(wait) - getWithCircuitBreaker(case.name) - } catch (e: CircuitBreakerException) { - return@async e + + fun call(wait: Duration = 0.milliseconds) = + async { + try { + delay(wait) + getWithCircuitBreaker(case.name) + } catch (e: CircuitBreakerException) { + return@async e + } + null } - null - } - val requests = List(CONCURRENCY_COUNT) { - call() - } - val delayedRequests = List(CONCURRENCY_COUNT) { - call(case.resetInterval) - } + val requests = + List(CONCURRENCY_COUNT) { + call() + } + val delayedRequests = + List(CONCURRENCY_COUNT) { + call(case.resetInterval) + } requests.awaitAll().count { it != null } shouldBe CONCURRENCY_COUNT delayedRequests.awaitAll().count { it == null } shouldBe CONCURRENCY_COUNT @@ -181,34 +184,36 @@ class CircuitBreakerTest : FunSpec() { } private fun TestScope.initClient(useTestCoroutineScheduler: Boolean = true) { - val mockEngine = MockEngine.create { - if (useTestCoroutineScheduler) { - dispatcher = StandardTestDispatcher(testCoroutineScheduler) - } - requestHandlers.add { _ -> - mockResponse() + val mockEngine = + MockEngine.create { + if (useTestCoroutineScheduler) { + dispatcher = StandardTestDispatcher(testCoroutineScheduler) + } + requestHandlers.add { _ -> + mockResponse() + } } - } - client = HttpClient(mockEngine) { - install(CircuitBreaking) { - global { - failureThreshold = Global.failureThreshold - halfOpenFailureThreshold = Global.halfOpenFailureThreshold - resetInterval = Global.resetInterval - failureTrigger = { - status.value >= 400 + client = + HttpClient(mockEngine) { + install(CircuitBreaking) { + global { + failureThreshold = Global.failureThreshold + halfOpenFailureThreshold = Global.halfOpenFailureThreshold + resetInterval = Global.resetInterval + failureTrigger = { + status.value >= 400 + } } - } - register(TestCircuitBreakerName) { - failureThreshold = Specific.failureThreshold - halfOpenFailureThreshold = Specific.halfOpenFailureThreshold - resetInterval = Specific.resetInterval - failureTrigger = { - status.value >= 400 + register(TestCircuitBreakerName) { + failureThreshold = Specific.failureThreshold + halfOpenFailureThreshold = Specific.halfOpenFailureThreshold + resetInterval = Specific.resetInterval + failureTrigger = { + status.value >= 400 + } } } } - } } private fun givenErrorResponse() { @@ -225,6 +230,5 @@ class CircuitBreakerTest : FunSpec() { private val customOk = HttpStatusCode(300, "It's ok!") - private suspend fun getWithCircuitBreaker(name: CircuitBreakerName) = - client.requestWithCircuitBreaker(name = name) {}.status + private suspend fun getWithCircuitBreaker(name: CircuitBreakerName) = client.requestWithCircuitBreaker(name = name) {}.status } diff --git a/ktor-server-kafka/build.gradle.kts b/ktor-server-kafka/build.gradle.kts index ae61591a..979c1fa2 100644 --- a/ktor-server-kafka/build.gradle.kts +++ b/ktor-server-kafka/build.gradle.kts @@ -20,7 +20,7 @@ tasks.matching { it.name.contains("native", ignoreCase = true) }.configureEach { } kotlin { - targetJvm() + targetJvm(project) sourceSets { val jvmMain by getting { dependencies { diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/Commons.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/Commons.kt index 3bbe8f8e..dc4a36b3 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/Commons.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/Commons.kt @@ -42,14 +42,14 @@ object Attributes { * Attribute key for [Producer] */ val ProducerAttributeKey = AttributeKey>( - KTOR_KAFKA_PRODUCER_ATTRIBUTE_KEY + KTOR_KAFKA_PRODUCER_ATTRIBUTE_KEY, ) /** * Attribute key for [Consumer] */ val ConsumerAttributeKey = AttributeKey>( - KTOR_KAFKA_CONSUMER_ATTRIBUTE_KEY + KTOR_KAFKA_CONSUMER_ATTRIBUTE_KEY, ) /** diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt index cf1c4254..9d0eb5e3 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt @@ -22,8 +22,8 @@ import io.ktor.server.application.hooks.MonitoringEvent import io.ktor.server.application.install import io.ktor.server.application.log import io.ktor.util.AttributeKey -import io.ktor.util.KtorDsl import io.ktor.utils.io.CancellationException +import io.ktor.utils.io.KtorDsl import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.cancel @@ -46,23 +46,25 @@ object FileConfig { * @param configurationPath The path to the configuration in the application configuration file * @param config Configuration block for the plugin, see [KafkaConsumerConfig] */ - val Kafka = createApplicationPlugin( - name = "Kafka", - configurationPath = DEFAULT_CONFIG_PATH, - createConfiguration = ::KafkaFileConfig - ) { - setupKafka(pluginConfig) - } + val Kafka = + createApplicationPlugin( + name = "Kafka", + configurationPath = DEFAULT_CONFIG_PATH, + createConfiguration = ::KafkaFileConfig, + ) { + setupKafka(pluginConfig) + } @Suppress("FunctionName") @KtorDsl - fun Kafka(configurationPath: String) = createApplicationPlugin( - name = "Kafka", - configurationPath = configurationPath, - createConfiguration = ::KafkaFileConfig - ) { - setupKafka(pluginConfig) - } + fun Kafka(configurationPath: String) = + createApplicationPlugin( + name = "Kafka", + configurationPath = configurationPath, + createConfiguration = ::KafkaFileConfig, + ) { + setupKafka(pluginConfig) + } /** * Installs the [Kafka] plugin with the given [KafkaFileConfig] block @@ -70,7 +72,7 @@ object FileConfig { @KtorDsl fun Application.kafka( configurationPath: String = DEFAULT_CONFIG_PATH, - config: KafkaFileConfig.() -> Unit + config: KafkaFileConfig.() -> Unit, ) { install(Kafka(configurationPath)) { config() } } @@ -102,12 +104,13 @@ object FileConfig { * } * ``` */ -val Kafka = createApplicationPlugin( - name = "Kafka", - createConfiguration = ::KafkaConfig -) { - setupKafka(pluginConfig) -} +val Kafka = + createApplicationPlugin( + name = "Kafka", + createConfiguration = ::KafkaConfig, + ) { + setupKafka(pluginConfig) + } /** * Installs the [Kafka] plugin with the given [KafkaConfig] block @@ -123,11 +126,12 @@ private fun PluginBuilder.setupKafka(pluginConfig: application.log.info("Setting up kafka clients") pluginConfig.schemaRegistryUrl?.let { if (pluginConfig.schemas.isNotEmpty()) { - val schemaRegistryClient = createSchemaRegistryClient( - it, - pluginConfig.schemaRegistrationTimeoutMs, - pluginConfig.schemaRegistryClientProvider - ) + val schemaRegistryClient = + createSchemaRegistryClient( + it, + pluginConfig.schemaRegistrationTimeoutMs, + pluginConfig.schemaRegistryClientProvider, + ) with(application) { schemaRegistryClient.registerSchemas(pluginConfig.schemas) }.also { application.attributes.put(SchemaRegistryClientKey, schemaRegistryClient) } @@ -152,11 +156,10 @@ private fun PluginBuilder.setupKafka(pluginConfig: } catch (e: Exception) { failCreatingClient("producer", pluginConfig.producerProperties!!) return + }?.also { + application.attributes.put(ProducerAttributeKey, it) + application.log.info("Kafka producer setup finished") } - ?.also { - application.attributes.put(ProducerAttributeKey, it) - application.log.info("Kafka producer setup finished") - } pluginConfig.consumerConfig?.let { try { @@ -177,13 +180,13 @@ private fun PluginBuilder.setupKafka(pluginConfig: private fun PluginBuilder.failCreatingClient( clientName: String, - clientProperties: KafkaProperties + clientProperties: KafkaProperties, ) { application.coroutineContext.cancel( CancellationException( "Failed creating kafka $clientName using properties: " + - clientProperties.entries.joinToString { "${it.key}: ${it.value}\n" } - ) + clientProperties.entries.joinToString { "${it.key}: ${it.value}\n" }, + ), ) } @@ -223,7 +226,7 @@ private fun PluginBuilder.closeConsumer() { with( checkNotNull(pluginConfig.consumerConfig) { "Consumer config changed to null during application start, this shouldn't happen" - } + }, ) { // Let it finish one round to avoid race condition delay(consumerPollFrequency) @@ -241,26 +244,27 @@ private fun PluginBuilder.onStart() { if (pluginConfig.consumerConfig == null) { application.log.warn( "Consumer defined but no consumer configuration defined, " + - "make sure to provide one during plugin installation" + "make sure to provide one during plugin installation", ) } else { with( checkNotNull(pluginConfig.consumerConfig) { "Consumer config changed to null during application start, this shouldn't happen" - } + }, ) { if (consumerRecordHandlers.isEmpty()) { application.log.debug( "No consumer record handlers defined, " + - "consumer job will not start automatically" + "consumer job will not start automatically", ) } runCatching { - application.startConsumer( - consumer = consumer, - pollFrequency = consumerPollFrequency, - consumerRecordHandlers = consumerRecordHandlers - ) { closeConsumer() } + application + .startConsumer( + consumer = consumer, + pollFrequency = consumerPollFrequency, + consumerRecordHandlers = consumerRecordHandlers, + ) { closeConsumer() } .also { application.attributes.put(ConsumerJob, it) application.log.info("Started kafka consumer") diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt index 3bf87699..3bfa54da 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt @@ -11,6 +11,7 @@ import io.github.flaxoos.ktor.server.plugins.kafka.Defaults.DEFAULT_GROUP_ID import io.github.flaxoos.ktor.server.plugins.kafka.Defaults.DEFAULT_SCHEMA_REGISTRY_CLIENT_TIMEOUT_MS import io.github.flaxoos.ktor.server.plugins.kafka.Defaults.DEFAULT_TOPIC_PARTITIONS import io.github.flaxoos.ktor.server.plugins.kafka.Defaults.DEFAULT_TOPIC_REPLICAS +import io.github.flaxoos.ktor.server.plugins.kafka.KafkaConfigPropertiesContext.Companion.propertiesContext import io.ktor.client.HttpClient import io.ktor.server.config.ApplicationConfig import org.apache.kafka.clients.CommonClientConfigs @@ -78,18 +79,24 @@ class KafkaConfig : AbstractKafkaConfig() { commonPropertiesBuilder?.build() } override val adminProperties: KafkaProperties? by lazy { - adminPropertiesBuilder?.build() + adminPropertiesBuilder + ?.build() + ?.propertiesContext(this@KafkaConfig) ?.withDefaultAdminConfig() ?.delegatingToCommon() } override val producerProperties: KafkaProperties? by lazy { - producerPropertiesBuilder?.build() + producerPropertiesBuilder + ?.build() + ?.propertiesContext(this@KafkaConfig) ?.withSchemaRegistryUrl() ?.withDefaultProducerConfig() ?.delegatingToCommon() } override val consumerProperties: KafkaProperties? by lazy { - consumerPropertiesBuilder?.build() + consumerPropertiesBuilder + ?.build() + ?.propertiesContext(this@KafkaConfig) ?.withSchemaRegistryUrl() ?.withDefaultConsumerConfig() ?.delegatingToCommon() @@ -105,23 +112,34 @@ class KafkaConfig : AbstractKafkaConfig() { /** * Configuration for the Kafka plugin */ -class KafkaFileConfig(config: ApplicationConfig) : AbstractKafkaConfig() { +class KafkaFileConfig( + config: ApplicationConfig, +) : AbstractKafkaConfig() { override var schemaRegistryUrl: String? = config.propertyOrNull("schema.registry.url")?.getString() override val commonProperties: KafkaProperties? = config.configOrNull("common")?.toMutableMap() override val adminProperties: KafkaProperties? = - config.configOrNull("admin")?.toMutableMap() + config + .configOrNull("admin") + ?.toMutableMap() + ?.propertiesContext(this@KafkaFileConfig) ?.withDefaultAdminConfig() ?.delegatingToCommon() override val producerProperties: KafkaProperties? = - config.configOrNull("producer")?.toMutableMap() + config + .configOrNull("producer") + ?.toMutableMap() + ?.propertiesContext(this@KafkaFileConfig) ?.withSchemaRegistryUrl() ?.withDefaultProducerConfig() ?.delegatingToCommon() override val consumerProperties: KafkaProperties? = - config.configOrNull("consumer")?.toMutableMap() + config + .configOrNull("consumer") + ?.toMutableMap() + ?.propertiesContext(this@KafkaFileConfig) ?.withSchemaRegistryUrl() ?.withDefaultConsumerConfig() ?.delegatingToCommon() @@ -130,41 +148,60 @@ class KafkaFileConfig(config: ApplicationConfig) : AbstractKafkaConfig() { } private fun ApplicationConfig.configOrNull(name: String) = - this.runCatching { config(name) }.getOrNull()?.toMap()?.toMutableMap() - -context (AbstractKafkaConfig) -internal fun KafkaProperties.delegatingToCommon(): KafkaProperties { - val joined = commonProperties - joined?.putAll(this) - return joined ?: this + this + .runCatching { config(name) } + .getOrNull() + ?.toMap() + ?.toMutableMap() + +class KafkaConfigPropertiesContext( + val kafkaConfig: AbstractKafkaConfig, + val kafkaProperties: KafkaProperties, +) { + companion object { + fun KafkaProperties.propertiesContext(kafkaConfig: AbstractKafkaConfig) = + KafkaConfigPropertiesContext( + kafkaConfig = kafkaConfig, + kafkaProperties = this, + ) + } } -context (AbstractKafkaConfig) -internal fun KafkaProperties.withDefaultAdminConfig() = apply { - getOrPut(CommonClientConfigs.CLIENT_ID_CONFIG) { DEFAULT_CLIENT_ID } +internal fun KafkaConfigPropertiesContext.delegatingToCommon(): KafkaProperties { + val joined = this.kafkaConfig.commonProperties + joined?.putAll(this.kafkaProperties) + return joined ?: this.kafkaProperties } -context (AbstractKafkaConfig) -internal fun KafkaProperties.withDefaultProducerConfig() = apply { - getOrPut(ProducerConfig.CLIENT_ID_CONFIG) { DEFAULT_CLIENT_ID } - getOrPut(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) { StringSerializer::class.java.name } - getOrPut(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) { KafkaAvroSerializer::class.java.name } -} +internal fun KafkaConfigPropertiesContext.withDefaultAdminConfig() = + apply { + kafkaProperties.getOrPut(CommonClientConfigs.CLIENT_ID_CONFIG) { DEFAULT_CLIENT_ID } + } -context (AbstractKafkaConfig) -internal fun KafkaProperties.withDefaultConsumerConfig() = apply { - getOrPut(ConsumerConfig.GROUP_ID_CONFIG) { DEFAULT_GROUP_ID } - getOrPut(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) { StringDeserializer::class.java.name } - getOrPut(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) { KafkaAvroDeserializer::class.java.name } -} +internal fun KafkaConfigPropertiesContext.withDefaultProducerConfig() = + apply { + kafkaProperties.getOrPut(ProducerConfig.CLIENT_ID_CONFIG) { DEFAULT_CLIENT_ID } + kafkaProperties.getOrPut(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) { StringSerializer::class.java.name } + kafkaProperties.getOrPut(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG) { KafkaAvroSerializer::class.java.name } + } -context (AbstractKafkaConfig) -internal fun KafkaProperties.withSchemaRegistryUrl() = apply { - put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl) -} +internal fun KafkaConfigPropertiesContext.withDefaultConsumerConfig() = + apply { + kafkaProperties.getOrPut(ConsumerConfig.GROUP_ID_CONFIG) { DEFAULT_GROUP_ID } + kafkaProperties.getOrPut(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG) { StringDeserializer::class.java.name } + kafkaProperties.getOrPut(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG) { KafkaAvroDeserializer::class.java.name } + } + +internal fun KafkaConfigPropertiesContext.withSchemaRegistryUrl() = + apply { + kafkaProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.kafkaConfig.schemaRegistryUrl) + } @KafkaDsl -fun KafkaConsumerConfig.consumerRecordHandler(topicName: TopicName, handler: ConsumerRecordHandler) { +fun KafkaConsumerConfig.consumerRecordHandler( + topicName: TopicName, + handler: ConsumerRecordHandler, +) { consumerRecordHandlers[topicName] = handler } @@ -188,40 +225,37 @@ fun AbstractKafkaConfig.registerSchemas(configuration: SchemaRegistrationBuilder } @KafkaDsl -fun KafkaConfig.topic(name: TopicName, block: TopicBuilder.() -> Unit) { +fun KafkaConfig.topic( + name: TopicName, + block: TopicBuilder.() -> Unit, +) { topicBuilders.add(TopicBuilder(name).apply(block)) } @KafkaDsl -fun KafkaConfig.producer( - configuration: ProducerPropertiesBuilder.() -> Unit = { ProducerPropertiesBuilder(schemaRegistryUrl) } -) { +fun KafkaConfig.producer(configuration: ProducerPropertiesBuilder.() -> Unit = { ProducerPropertiesBuilder(schemaRegistryUrl) }) { producerPropertiesBuilder = ProducerPropertiesBuilder( // assuming only avro is used, support custom serializers later with(checkNotNull(schemaRegistryUrl) { "Consumer schema registry url is not set" }) { this - } + }, ).apply(configuration) } @KafkaDsl -fun KafkaConfig.consumer( - configuration: ConsumerPropertiesBuilder.() -> Unit = { ConsumerPropertiesBuilder(schemaRegistryUrl) } -) { +fun KafkaConfig.consumer(configuration: ConsumerPropertiesBuilder.() -> Unit = { ConsumerPropertiesBuilder(schemaRegistryUrl) }) { consumerPropertiesBuilder = ConsumerPropertiesBuilder( // assuming only avro is used, support custom serializers later with(checkNotNull(schemaRegistryUrl) { "Consumer schema registry url is not set" }) { this - } + }, ).apply(configuration) } @KafkaDsl -fun AbstractKafkaConfig.consumerConfig( - configuration: KafkaConsumerConfig.() -> Unit = { } -) { +fun AbstractKafkaConfig.consumerConfig(configuration: KafkaConsumerConfig.() -> Unit = { }) { consumerConfig = KafkaConsumerConfig().apply(configuration) } @@ -249,7 +283,9 @@ class SchemaRegistrationBuilder { @KafkaDsl @Suppress("MemberVisibilityCanBePrivate") -class TopicBuilder(internal val name: TopicName) { +class TopicBuilder( + internal val name: TopicName, +) { var partitions: Int = DEFAULT_TOPIC_PARTITIONS var replicas: Short = DEFAULT_TOPIC_REPLICAS var replicasAssignments: Map>? = null @@ -261,24 +297,24 @@ class TopicBuilder(internal val name: TopicName) { } internal fun build(): NewTopic { - val topic = if (replicasAssignments == null) { - NewTopic(name.value, partitions, replicas) - } else { - NewTopic(name.value, replicasAssignments) - } + val topic = + if (replicasAssignments == null) { + NewTopic(name.value, partitions, replicas) + } else { + NewTopic(name.value, replicasAssignments) + } return topic.configs(configs?.filterValues { it != null }?.mapValues { it.value.toString() }) } @Suppress("UNCHECKED_CAST") companion object { - fun froMap(map: Map): TopicBuilder { - return TopicBuilder(TopicName.named(map["name"] as String)).apply { + fun froMap(map: Map): TopicBuilder = + TopicBuilder(TopicName.named(map["name"] as String)).apply { partitions = map["partitions"] as Int replicas = (map["replicas"] as Int).toShort() replicasAssignments = map["replicasAssignments"] as Map>? configs = map["configs"] as Map? } - } } } @@ -407,8 +443,7 @@ sealed class ClientPropertiesBuilder : KafkaPropertiesBuilder { */ data object CommonClientPropertiesBuilder : ClientPropertiesBuilder() -class AdminPropertiesBuilder : - ClientPropertiesBuilder() +class AdminPropertiesBuilder : ClientPropertiesBuilder() /** * Used to constraint consumer and producer builders to provide a schema registry url @@ -422,8 +457,9 @@ internal interface SchemaRegistryProvider { */ @Suppress("MemberVisibilityCanBePrivate", "SpellCheckingInspection", "CyclomaticComplexMethod") class ProducerPropertiesBuilder( - override var schemaRegistryUrl: String? = null -) : ClientPropertiesBuilder(), SchemaRegistryProvider { + override var schemaRegistryUrl: String? = null, +) : ClientPropertiesBuilder(), + SchemaRegistryProvider { var batchSize: Any? = null var acks: Any? = null var lingerMs: Any? = null @@ -471,8 +507,9 @@ class ProducerPropertiesBuilder( */ @Suppress("MemberVisibilityCanBePrivate", "CyclomaticComplexMethod") class ConsumerPropertiesBuilder( - override var schemaRegistryUrl: String? = null -) : ClientPropertiesBuilder(), SchemaRegistryProvider { + override var schemaRegistryUrl: String? = null, +) : ClientPropertiesBuilder(), + SchemaRegistryProvider { var groupId: Any? = null var groupInstanceId: Any? = null var maxPollRecords: Any? = null @@ -529,7 +566,8 @@ typealias KafkaProperties = MutableMap @Suppress("unused") enum class MessageTimestampType { - CreateTime, LogAppendTime + CreateTime, + LogAppendTime, } typealias CompressionType = org.apache.kafka.common.record.CompressionType diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt index 4c7c70a8..563be4f5 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt @@ -15,7 +15,7 @@ context (CoroutineScope) internal suspend fun AdminClient.createKafkaTopics( topicBuilders: List, existingTopicHandler: (NewTopic) -> Unit = {}, - topicCreationHandler: Pair>.() -> Unit + topicCreationHandler: Pair>.() -> Unit, ) { val existingTopics = listTopics().listings().get().map { it.name() } val createTopicsResult = diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt index 7d820f57..fabbcc9d 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt @@ -43,7 +43,7 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String context (Application) inline fun registerSchemas( - schemas: MutableMap, TopicName> + schemas: MutableMap, TopicName>, ) { schemas.forEach { registerSchema(it.key, it.value) @@ -62,7 +62,7 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String inline fun registerSchema( klass: KClass, topicName: TopicName, - noinline onConflict: () -> Unit = {} + noinline onConflict: () -> Unit = {}, ) { val schema = Avro.default.schema(klass.serializer()) val payload = mapOf("schema" to schema.toString()) // Creating a map to form the payload @@ -77,7 +77,7 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String if (!it.status.isSuccess()) { log.error( "Failed registering schema to schema registry at ${it.call.request.url}:\n${it.status} " + - "${it.bodyAsText()}:\nschema: $payload" + "${it.bodyAsText()}:\nschema: $payload", ) } } diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Consumer.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Consumer.kt index 2df02386..01c4e443 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Consumer.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Consumer.kt @@ -27,7 +27,7 @@ internal fun Application.startConsumer( consumer: Consumer, pollFrequency: Duration, consumerRecordHandlers: Map, - cleanUp: () -> Unit + cleanUp: () -> Unit, ): Job { val consumerFlow = subscribe(consumer, pollFrequency, consumerRecordHandlers.keys.toList()) return launch(Dispatchers.IO) { @@ -57,7 +57,7 @@ internal fun Application.startConsumer( fun Application.subscribe( consumer: Consumer, pollFrequency: Duration, - topics: List + topics: List, ) = flow { consumer.subscribe(topics.map { it.value }) while (consumerShouldRun) { diff --git a/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/BaseKafkaIntegrationTest.kt b/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/BaseKafkaIntegrationTest.kt index e7860687..47fa5d83 100644 --- a/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/BaseKafkaIntegrationTest.kt +++ b/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/BaseKafkaIntegrationTest.kt @@ -59,7 +59,7 @@ abstract class BaseKafkaIntegrationTest : FunSpec() { .replace(BOOTSTRAP_SERVERS_PLACEHOLDER, kafka.bootstrapServers) .replace(SCHEMA_REGISTRY_URL_PLACEHOLDER, schemaRegistryUrl) .replace(GROUP_ID_PLACEHOLDER, this.testCase.name.testName.plus("-group")) - .replace(CLIENT_ID_PLACEHOLDER, this.testCase.name.testName.plus("-client")) + .replace(CLIENT_ID_PLACEHOLDER, this.testCase.name.testName.plus("-client")), ) } @@ -104,7 +104,7 @@ abstract class BaseKafkaIntegrationTest : FunSpec() { } catch (e: Exception) { logger.info( "Attempt $i to connect to Kafka broker at bootstrap.servers: " + - "$kafka.bootstrapServers failed, retrying" + "$kafka.bootstrapServers failed, retrying", ) delay(delay) } @@ -115,7 +115,7 @@ abstract class BaseKafkaIntegrationTest : FunSpec() { if (!isConnected) { throw AssertionError( "Unable to connect to Kafka broker at bootstrap.servers: " + - "$kafka.bootstrapServers" + "$kafka.bootstrapServers", ) } logger.info("Connected to Kafka broker at bootstrap.servers: $kafka.bootstrapServers") diff --git a/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KtorKafkaIntegrationTest.kt b/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KtorKafkaIntegrationTest.kt index b20394d7..03128bd8 100644 --- a/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KtorKafkaIntegrationTest.kt +++ b/ktor-server-kafka/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KtorKafkaIntegrationTest.kt @@ -97,7 +97,7 @@ class KtorKafkaIntegrationTest : BaseKafkaIntegrationTest() { consumerRecordHandler(topicName) { record -> logger.debug("Consumed record: {} on topic: {}", record, topicName) recordChannel.send( - fromRecord(record.value()) + fromRecord(record.value()), ) } } @@ -129,7 +129,7 @@ class KtorKafkaIntegrationTest : BaseKafkaIntegrationTest() { private fun testKafkaApplication( extraAssertions: Application.() -> Unit = {}, - pluginInstallation: Application.() -> Unit + pluginInstallation: Application.() -> Unit, ) { testApplication { val client = setupClient() @@ -177,7 +177,7 @@ class KtorKafkaIntegrationTest : BaseKafkaIntegrationTest() { private fun ApplicationTestBuilder.setupApplication( extraAssertions: Application.() -> Unit = {}, - pluginInstallation: Application.() -> Unit + pluginInstallation: Application.() -> Unit, ) { application { install(ContentNegotiation) { @@ -230,5 +230,5 @@ class KtorKafkaIntegrationTest : BaseKafkaIntegrationTest() { @AvroNamespace("io.github.flaxoos") data class TestRecord( val id: Int, - val topic: String + val topic: String, ) diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimiter.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimiter.kt index 329ce22a..00bbe5b6 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimiter.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimiter.kt @@ -45,14 +45,14 @@ sealed interface RateLimiterResponse { data class NotLimited( override val rateLimiter: RateLimiter, - val remaining: Number? = null + val remaining: Number? = null, ) : RateLimiterResponse data class LimitedBy( override val rateLimiter: RateLimiter, val exceededBy: Number, val resetIn: Duration, - val message: String + val message: String, ) : RateLimiterResponse } diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingConfiguration.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingConfiguration.kt index bf276b20..45447e16 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingConfiguration.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingConfiguration.kt @@ -7,7 +7,6 @@ import io.ktor.http.HttpStatusCode import io.ktor.server.application.Application import io.ktor.server.application.ApplicationCall import io.ktor.server.application.log -import io.ktor.server.auth.Principal import io.ktor.server.response.respond import kotlinx.datetime.Clock.System.now import kotlin.reflect.KClass @@ -24,7 +23,7 @@ annotation class RateLimitingDsl * Rate limit plugin configuration. * * Be careful using whitelisting, as the caller can abuse it by overriding the host or - * user-agent by manipulating the headers, it is safest to use [Principal] whitelisting, + * user-agent by manipulating the headers, it is safest to use Principal whitelisting, * as it relies on authentication. */ @RateLimitingDsl @@ -42,7 +41,7 @@ class RateLimitingConfiguration { /** * Any [Principal]s that are whitelisted, i.e. will be allowed through without rate limiting */ - var whiteListedPrincipals: Set = emptySet() + var whiteListedPrincipals: Set = emptySet() /** * Any user-agents that are whitelisted, i.e. will be allowed through without rate limiting @@ -57,7 +56,7 @@ class RateLimitingConfiguration { /** * Any [Principal]s that are blacklisted, i.e. will not be allowed through in any case, handled by [blackListedCallerCallHandler] */ - var blackListedPrincipals: Set = emptySet() + var blackListedPrincipals: Set = emptySet() /** * Any user-agents that are blacklisted, i.e. will not be allowed through in any case, handled by [blackListedCallerCallHandler] @@ -88,7 +87,7 @@ class RateLimitingConfiguration { this.response.headers.append("$X_RATE_LIMIT-Limit", "${rateLimiterResponse.rateLimiter.capacity}") this.response.headers.append( "$X_RATE_LIMIT-Measured-by", - rateLimiterResponse.rateLimiter.callVolumeUnit.name + rateLimiterResponse.rateLimiter.callVolumeUnit.name, ) this.response.headers.append("$X_RATE_LIMIT-Reset", "${rateLimiterResponse.resetIn.inWholeMilliseconds}") } @@ -124,7 +123,7 @@ class RateLimitingConfiguration { /** * The unit by which the rate limiter capacity is measured, not applicable for [LeakyBucket] */ - var callVolumeUnit: CallVolumeUnit = CallVolumeUnit.Calls() + var callVolumeUnit: CallVolumeUnit = CallVolumeUnit.Calls(), ) { init { require(capacity > 0) { @@ -132,58 +131,60 @@ class RateLimitingConfiguration { } } - internal fun provideRateLimiter(application: Application): () -> RateLimiter = when (type) { - LeakyBucket::class -> { - when (callVolumeUnit) { - is CallVolumeUnit.Bytes -> { - application.log.warn( - "LeakyBucket does not support CallVolumeUnit.Bytes, " + - "will use CallVolumeUnit.Calls" - ) + internal fun provideRateLimiter(application: Application): () -> RateLimiter = + when (type) { + LeakyBucket::class -> { + when (callVolumeUnit) { + is CallVolumeUnit.Bytes -> { + application.log.warn( + "LeakyBucket does not support CallVolumeUnit.Bytes, " + + "will use CallVolumeUnit.Calls", + ) + } + + is CallVolumeUnit.Calls -> + if (callVolumeUnit.size.compareTo(1) != 0) { + application.log.warn( + "LeakyBucket does not support CallVolumeUnit.Calls with size " + + "!= 1, 1 will be effectively used", + ) + } } - is CallVolumeUnit.Calls -> if (callVolumeUnit.size.compareTo(1) != 0) { - application.log.warn( - "LeakyBucket does not support CallVolumeUnit.Calls with size " + - "!= 1, 1 will be effectively used" + { + LeakyBucket( + rate = rate, + capacity = capacity, + clock = clock, ) } } - { - LeakyBucket( - rate = rate, - capacity = capacity, - clock = clock + SlidingWindow::class -> ( + { + SlidingWindow( + rate = rate, + capacity = capacity, + callVolumeUnit = callVolumeUnit, + clock = clock, + ) + } ) - } - } - SlidingWindow::class -> ( - { - SlidingWindow( - rate = rate, - capacity = capacity, - callVolumeUnit = callVolumeUnit, - clock = clock - ) - } - ) - - TokenBucket::class -> ( - { - TokenBucket( - rate = rate, - capacity = capacity, - callVolumeUnit = callVolumeUnit, - clock = clock + TokenBucket::class -> ( + { + TokenBucket( + rate = rate, + capacity = capacity, + callVolumeUnit = callVolumeUnit, + clock = clock, + ) + } ) - } - ) - else -> { - error("Unsupported provider type: $type") + else -> { + error("Unsupported provider type: $type") + } } - } } } diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPlugin.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPlugin.kt index 9ba64e84..f04a2020 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPlugin.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPlugin.kt @@ -6,7 +6,6 @@ import io.ktor.server.application.createRouteScopedPlugin import io.ktor.server.application.install import io.ktor.server.application.log import io.ktor.server.auth.AuthenticationChecked -import io.ktor.server.auth.Principal import io.ktor.server.auth.principal import io.ktor.server.plugins.doublereceive.DoubleReceive import io.ktor.server.plugins.origin @@ -19,10 +18,11 @@ import kotlinx.coroutines.sync.withLock * Rate limiting plugin, apply to route to provide route scoped rate limiting, * see [RateLimitingConfiguration] for details on how to configure */ -val RateLimiting = createRouteScopedPlugin( - name = "RateLimiting", - createConfiguration = ::RateLimitingConfiguration -) { applyNewRateLimiter() } +val RateLimiting = + createRouteScopedPlugin( + name = "RateLimiting", + createConfiguration = ::RateLimitingConfiguration, + ) { applyNewRateLimiter() } private fun PluginBuilder.applyNewRateLimiter() { val rateLimiters = mutableMapOf() @@ -37,45 +37,58 @@ private fun PluginBuilder.applyNewRateLimiter() { on(AuthenticationChecked) { call -> with(pluginConfig) { - if (excludePaths.any { call.request.path().trimStart { c -> c == '/' }.matches(it) }) { + if (excludePaths.any { + call.request + .path() + .trimStart { c -> c == '/' } + .matches(it) + } + ) { return@with } val caller = call.extractCaller() application.log.debug("Handling call by ${caller.toIdentifier()}") - if (caller.remoteHost in blackListedHosts || caller.principal in blackListedPrincipals || caller.userAgent in blackListedAgents) { + if (caller.remoteHost in blackListedHosts || + caller.principal in blackListedPrincipals || + caller.userAgent in blackListedAgents + ) { application.log.debug( "User ${caller.toIdentifier()} is blacklisted by ${ listOfNotNull( if (caller.remoteHost in blackListedHosts) "host" else null, if (caller.principal in blackListedPrincipals) "principal" else null, - if (caller.userAgent in blackListedAgents) "user agent" else null + if (caller.userAgent in blackListedAgents) "user agent" else null, ).joinToString(",") - }" + }", ) blackListedCallerCallHandler(call) return@with } - if (caller.remoteHost in whiteListedHosts || caller.principal in whiteListedPrincipals || caller.userAgent in whiteListedAgents) { + if (caller.remoteHost in whiteListedHosts || + caller.principal in whiteListedPrincipals || + caller.userAgent in whiteListedAgents + ) { application.log.debug( "User ${caller.toIdentifier()} is whitelisted by ${ listOfNotNull( if (caller.remoteHost in whiteListedHosts) "host" else null, if (caller.principal in whiteListedPrincipals) "principal" else null, - if (caller.userAgent in whiteListedAgents) "user agent" else null + if (caller.userAgent in whiteListedAgents) "user agent" else null, ).joinToString(",") - }" + }", ) return@with } - val provider = rateLimitersLock.withLock { - rateLimiters.getOrPut(call.extractCaller()) { - application.log.debug("Putting new rate limiter for ${caller.toIdentifier()}") - rateLimiterConfiguration.provideRateLimiter(application = application).invoke() + val provider = + rateLimitersLock.withLock { + rateLimiters.getOrPut(call.extractCaller()) { + application.log.debug("Putting new rate limiter for ${caller.toIdentifier()}") + rateLimiterConfiguration.provideRateLimiter(application = application).invoke() + } } - } with(provider.tryAccept(call)) { application.log.debug(debugDetails(caller = caller)) @@ -95,9 +108,7 @@ private fun PluginBuilder.applyNewRateLimiter() { } } -private fun RateLimiterResponse.debugDetails( - caller: Caller -) = +private fun RateLimiterResponse.debugDetails(caller: Caller) = "call from $caller ${if (this is RateLimiterResponse.LimitedBy) "" else "not"} limited ${ if (this is RateLimiterResponse.LimitedBy) { this.message @@ -109,21 +120,23 @@ private fun RateLimiterResponse.debugDetails( private fun ApplicationCall.extractCaller(): Caller { val remoteHost = this.request.origin.remoteHost val userAgent = this.request.userAgent() - val principal = this.principal().also { - if (it == null) { - application.log.debug( - "No authenticated principal found in call, identification is based on http headers X-Forwarded-For and User-Agent" - ) + val principal = + this.principal().also { + if (it == null) { + application.log.debug( + "No authenticated principal found in call, identification is based on http headers X-Forwarded-For and User-Agent", + ) + } } - } return Caller(remoteHost, userAgent, principal) } private data class Caller( val remoteHost: String, val userAgent: String?, - val principal: Principal? + val principal: Any?, ) { fun toIdentifier() = "$remoteHost|${userAgent ?: ""}|${principal ?: ""}" + override fun toString() = toIdentifier() } diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/Bucket.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/Bucket.kt index 04d2260c..9ceb9f2b 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/Bucket.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/Bucket.kt @@ -35,7 +35,7 @@ sealed class Bucket(initialVolume: Int, final override val clock: () -> Long, fi call: ApplicationCall, by: Double, shouldUpdateTime: Boolean = false, - consideringLastUpdateTime: suspend Double.(Long) -> Double = { this } + consideringLastUpdateTime: suspend Double.(Long) -> Double = { this }, ): Double? { by.checkNotNegative() return reduceVolume(call, by, shouldUpdateTime, consideringLastUpdateTime) @@ -51,7 +51,7 @@ sealed class Bucket(initialVolume: Int, final override val clock: () -> Long, fi call: ApplicationCall, by: Double, shouldUpdateTime: Boolean = false, - consideringLastUpdateTime: suspend Double.(Long) -> Double = { this } + consideringLastUpdateTime: suspend Double.(Long) -> Double = { this }, ): Double? { by.checkNotNegative() return tryUpdateVolume(call, shouldUpdateTime) { volume, timeSinceLastUpdate -> @@ -69,7 +69,7 @@ sealed class Bucket(initialVolume: Int, final override val clock: () -> Long, fi call: ApplicationCall, by: Double, shouldUpdateTime: Boolean = false, - consideringLastUpdateTime: suspend Double.(Long) -> Double = { this } + consideringLastUpdateTime: suspend Double.(Long) -> Double = { this }, ): Double? { by.checkNotNegative() return increaseVolume(call, by, shouldUpdateTime, consideringLastUpdateTime) @@ -85,7 +85,7 @@ sealed class Bucket(initialVolume: Int, final override val clock: () -> Long, fi call: ApplicationCall, by: Double, shouldUpdateTime: Boolean = false, - consideringLastUpdateTime: suspend Double.(Long) -> Double = { this } + consideringLastUpdateTime: suspend Double.(Long) -> Double = { this }, ): Double? { by.checkNotNegative() return tryUpdateVolume(call, shouldUpdateTime) { volume, timeSinceLastUpdate -> @@ -104,7 +104,7 @@ sealed class Bucket(initialVolume: Int, final override val clock: () -> Long, fi private suspend fun tryUpdateVolume( call: ApplicationCall, shouldUpdateTime: Boolean = false, - update: suspend (Double, Long) -> Double + update: suspend (Double, Long) -> Double, ): Double? { return currentVolumeMutex.withLock { val now = clock() diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/LeakyBucket.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/LeakyBucket.kt index 130f6e59..46db10a2 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/LeakyBucket.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/LeakyBucket.kt @@ -40,7 +40,7 @@ class LeakyBucket( /** * A time provider in milliseconds */ - clock: () -> Long = { Clock.System.now().toEpochMilliseconds() } + clock: () -> Long = { Clock.System.now().toEpochMilliseconds() }, ) : Bucket(0, clock, capacity) { private val leakHole = Mutex() private var lastLeak: Long = Instant.DISTANT_PAST.toEpochMilliseconds() @@ -53,7 +53,7 @@ class LeakyBucket( leak(call) RateLimiterResponse.NotLimited( this@LeakyBucket, - remaining = null + remaining = null, ) } ?: run { log.debug { "${call.id()}: Rejected due to bucket overflow" } @@ -61,7 +61,7 @@ class LeakyBucket( this, resetIn = rate, exceededBy = 1, - message = "Bucket of size $capacity is full, call rejected" + message = "Bucket of size $capacity is full, call rejected", ) } } diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/SlidingWindow.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/SlidingWindow.kt index a01d4df2..c052a531 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/SlidingWindow.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/SlidingWindow.kt @@ -27,7 +27,7 @@ data class SlidingWindow( /** * A time provider */ - override val clock: () -> Long = { now().toEpochMilliseconds() } + override val clock: () -> Long = { now().toEpochMilliseconds() }, ) : RateLimiter() { private val timeWindowMs = rate.inWholeMilliseconds private var timestamps = ConcurrentFixedSizeWeightedQueue(capacity * callVolumeUnit.size) @@ -53,7 +53,7 @@ data class SlidingWindow( this, resetIn = resetIn, exceededBy = callSize, - message = "$capacity calls were already made during $rate" + message = "$capacity calls were already made during $rate", ) } @@ -73,7 +73,7 @@ class ConcurrentFixedSizeWeightedQueue( /** * the maximum weight, must be greater than 0 */ - private val maxWeight: Int + private val maxWeight: Int, ) { private val list = queueList>() private val size: Int diff --git a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/TokenBucket.kt b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/TokenBucket.kt index f1907cbf..e5fda28e 100644 --- a/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/TokenBucket.kt +++ b/ktor-server-rate-limiting/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/implementations/TokenBucket.kt @@ -40,7 +40,7 @@ class TokenBucket( /** * A time provider in milliseconds */ - clock: () -> Long = { Clock.System.now().toEpochMilliseconds() } + clock: () -> Long = { Clock.System.now().toEpochMilliseconds() }, ) : Bucket(capacity * callVolumeUnit.size, clock, capacity * callVolumeUnit.size) { init { @@ -63,7 +63,7 @@ class TokenBucket( exceededBy = callSize, message = "Insufficient tokens to accept call. tokens: $currentVolume, " + "measured in ${callVolumeUnit::class.simpleName?.lowercase()} of size ${callVolumeUnit.size}. " + - "call size: $callSize" + "call size: $callSize", ) } } diff --git a/ktor-server-rate-limiting/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPluginTest.kt b/ktor-server-rate-limiting/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPluginTest.kt index 18913436..1734d48f 100644 --- a/ktor-server-rate-limiting/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPluginTest.kt +++ b/ktor-server-rate-limiting/src/jvmTest/kotlin/io/github/flaxoos/ktor/server/plugins/ratelimiter/RateLimitingPluginTest.kt @@ -22,6 +22,7 @@ import io.ktor.http.HttpStatusCode.Companion.Forbidden import io.ktor.http.HttpStatusCode.Companion.InternalServerError import io.ktor.http.HttpStatusCode.Companion.OK import io.ktor.http.HttpStatusCode.Companion.TooManyRequests +import io.ktor.server.application.Application import io.ktor.server.application.call import io.ktor.server.application.install import io.ktor.server.auth.Authentication @@ -38,8 +39,7 @@ import io.ktor.server.response.respondText import io.ktor.server.routing.get import io.ktor.server.routing.route import io.ktor.server.routing.routing -import io.ktor.server.testing.TestApplicationEngine -import io.ktor.server.testing.createTestEnvironment +import io.ktor.server.testing.testApplication import io.ktor.utils.io.core.toByteArray import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -69,7 +69,6 @@ private fun encodeBasicAuth(name: String) = Base64.encode("$name:$BASIC_AUTH_PAS private val logger = KotlinLogging.logger { } class RateLimitingPluginTest : FunSpec() { - init { context("Rate Limiting Plugin Tests") { context("Installation") { @@ -77,10 +76,10 @@ class RateLimitingPluginTest : FunSpec() { "If call volume unit is bytes, should install double receive plugin", modifyConfiguration = { rateLimiter { callVolumeUnit = Bytes(1) } - } + }, ) { it.testCalls( - times = 2 + times = 2, ) { shouldBeOk() } @@ -89,16 +88,16 @@ class RateLimitingPluginTest : FunSpec() { context("Basic Functionality") { testRateLimiting("Exceeding rate limit on applied route should return Too Many Requests status") { it.testCalls( - times = LIMIT + EXCEED + times = LIMIT + EXCEED, ) { shouldBeLimited() } } testRateLimiting( - "Other routes should not be affected" + "Other routes should not be affected", ) { it.testCalls( times = LIMIT + EXCEED, - path = UNLIMITED_PATH + path = UNLIMITED_PATH, ) { shouldBeOk() } } @@ -106,57 +105,61 @@ class RateLimitingPluginTest : FunSpec() { "Excluded routes should not be affected", modifyConfiguration = { excludePaths = setOf(Regex("$LIMITED_PATH/$UNLIMITED_PATH/.*")) - } + }, ) { it.testCalls( times = LIMIT + EXCEED, - path = "$LIMITED_PATH/$UNLIMITED_PATH/$UNLIMITED_PATH" + path = "$LIMITED_PATH/$UNLIMITED_PATH/$UNLIMITED_PATH", ) { shouldBeOk() } } testRateLimiting("Should distinguish between callers") { it.testCalls( times = LIMIT, - callers = listOf(CALLER1, CALLER2) + callers = listOf(CALLER1, CALLER2), ) { shouldBeOk() } } testRateLimiting("Following requests should pass") { it.testCalls( - times = LIMIT + EXCEED + times = LIMIT + EXCEED, ) { shouldBeLimited() } logger.info { "waiting..." } delay(RATE_LIMITER_RATE_MS.milliseconds * 1.1) it.testCalls( - times = 1 + times = 1, ) { shouldBeOk() } } } context("Bursts") { testRateLimiting( - "Should handle bursts" + "Should handle bursts", ) { it.testCalls( - times = LIMIT + times = LIMIT, ) { shouldBeOk() - val earliestRequestTime = minOf { response -> - response.requestTime.timestamp - } + val earliestRequestTime = + minOf { response -> + response.requestTime.timestamp + } when (it.implementation) { TokenBucket::class, SlidingWindow::class -> { forAll { response -> - (response.responseTime.timestamp - earliestRequestTime).milliseconds shouldBeLessThan RATE_LIMITER_RATE_MS.milliseconds + (response.responseTime.timestamp - earliestRequestTime).milliseconds shouldBeLessThan + RATE_LIMITER_RATE_MS.milliseconds } } LeakyBucket::class -> { // Check they are spaced by the rate - asSequence().sortedBy { response -> response.responseTime.timestamp } - .windowed(2).map { responses -> + asSequence() + .sortedBy { response -> response.responseTime.timestamp } + .windowed(2) + .map { responses -> (responses[1].responseTime.timestamp - responses[0].responseTime.timestamp).milliseconds }.onEach { duration -> // Grace period of 20ms for inaccuracies @@ -177,27 +180,27 @@ class RateLimitingPluginTest : FunSpec() { context("Whitelisting") { testRateLimiting( "Should let whitelisted users pass", - { whiteListedPrincipals = setOf(UserIdPrincipal(CALLER1)) } + { whiteListedPrincipals = setOf(UserIdPrincipal(CALLER1)) }, ) { it.testCalls( - times = LIMIT + EXCEED + times = LIMIT + EXCEED, ) { shouldBeOk() } } testRateLimiting("Should let whitelisted hosts pass", { whiteListedHosts = setOf(LOCALHOST) - }) { + },) { it.testCalls( - times = LIMIT + EXCEED + times = LIMIT + EXCEED, ) { shouldBeOk() } } testRateLimiting("Should let whitelisted user agents pass", { whiteListedAgents = setOf(USER_AGENT) - }) { + },) { it.testCalls( times = LIMIT + EXCEED, - userAgent = USER_AGENT + userAgent = USER_AGENT, ) { shouldBeOk() } } } @@ -205,26 +208,26 @@ class RateLimitingPluginTest : FunSpec() { context("Blacklisting") { testRateLimiting("Should not let blacklisted users pass", { blackListedPrincipals = setOf(UserIdPrincipal(CALLER1)) - }) { + },) { it.testCalls( - times = 1 + times = 1, ) { shouldBeForbidden() } } testRateLimiting("Should not let blacklisted hosts pass", { blackListedHosts = setOf(LOCALHOST) - }) { + },) { it.testCalls( - times = 1 + times = 1, ) { shouldBeForbidden() } } testRateLimiting("Should not let blacklisted user agents pass", { blackListedAgents = setOf(USER_AGENT) - }) { + },) { it.testCalls( times = 1, - userAgent = USER_AGENT + userAgent = USER_AGENT, ) { shouldBeForbidden() } } } @@ -234,80 +237,76 @@ class RateLimitingPluginTest : FunSpec() { private suspend fun FunSpecContainerScope.testRateLimiting( testName: String, modifyConfiguration: RateLimitingConfiguration.() -> Unit = {}, - block: suspend (RateLimiterTestScope) -> Unit + block: suspend (RateLimiterTestScope) -> Unit, ) { withData( nameFn = { "${it.simpleName}: $testName" }, - listOf(TokenBucket::class, LeakyBucket::class, SlidingWindow::class) + listOf(TokenBucket::class, LeakyBucket::class, SlidingWindow::class), ) { implementation -> logger.info { "--------------------------" } logger.info { "Starting test: $testName" } - val engine = createAppEngine(implementation, modifyConfiguration) - try { - engine.start() - block(RateLimiterTestScope(engine.client, implementation)) - } finally { - engine.stop() + + testApplication { + application { + configure(implementation, modifyConfiguration) + } + block(RateLimiterTestScope(client, implementation)) } } } - private fun createAppEngine( + private fun Application.configure( implementation: KClass, - modifyConfiguration: RateLimitingConfiguration.() -> Unit - ) = TestApplicationEngine( - createTestEnvironment { - module { - install(Authentication) { - basic("auth-basic") { - validate { credentials -> - UserIdPrincipal(credentials.name) - } - } + modifyConfiguration: RateLimitingConfiguration.() -> Unit, + ) { + install(Authentication) { + basic("auth-basic") { + validate { credentials -> + UserIdPrincipal(credentials.name) } - install(StatusPages) { - exception { call, cause -> - call.respondText(text = "500: ${cause.stackTraceToString()}", status = InternalServerError) + } + } + install(StatusPages) { + exception { call, cause -> + call.respondText(text = "500: ${cause.stackTraceToString()}", status = InternalServerError) + } + } + install(CallId) { + retrieveFromHeader(HttpHeaders.XRequestId) + } + routing { + authenticate("auth-basic", strategy = AuthenticationStrategy.Required) { + route(LIMITED_PATH) { + install(RateLimiting) { + config(implementation, modifyConfiguration) } - } - install(CallId) { - retrieveFromHeader(HttpHeaders.XRequestId) - } - routing { - authenticate("auth-basic", strategy = AuthenticationStrategy.Required) { - route(LIMITED_PATH) { - install(RateLimiting) { - config(implementation, modifyConfiguration) - } - get { - // Invoke double receive - call.receive(ByteArray::class) + get { + // Invoke double receive + call.receive(ByteArray::class) - call.principal()?.name ?: error("no principal") - call.respond(OK) - } - route(UNLIMITED_PATH) { - route(UNLIMITED_PATH) { - get { - call.respond(OK) - } - } - } - } + call.principal()?.name ?: error("no principal") + call.respond(OK) } route(UNLIMITED_PATH) { - get { - call.respond(OK) + route(UNLIMITED_PATH) { + get { + call.respond(OK) + } } } } } + route(UNLIMITED_PATH) { + get { + call.respond(OK) + } + } } - ) + } private fun RateLimitingConfiguration.config( implementation: KClass, - configuration: RateLimitingConfiguration.() -> Unit + configuration: RateLimitingConfiguration.() -> Unit, ) { rateLimiter { this.type = implementation @@ -323,24 +322,25 @@ class RateLimitingPluginTest : FunSpec() { callDelay: Duration = REQUEST_DELAY_MS.milliseconds, callers: List = listOf(CALLER1), userAgent: String? = null, - checkResponses: suspend List.() -> Unit + checkResponses: suspend List.() -> Unit, ) { coroutineScope { - (1..times).flatMap { index -> - callers.map { caller -> - async { - delay(callDelay) - client.get(path) { - val auth = encodeBasicAuth(caller) - headers.append("Authorization", "Basic $auth") - headers.append(HttpHeaders.XRequestId, "${this.url}, caller: $caller index: $index") - userAgent?.let { headers.append(HttpHeaders.UserAgent, it) } + (1..times) + .flatMap { index -> + callers.map { caller -> + async { + delay(callDelay) + client.get(path) { + val auth = encodeBasicAuth(caller) + headers.append("Authorization", "Basic $auth") + headers.append(HttpHeaders.XRequestId, "${this.url}, caller: $caller index: $index") + userAgent?.let { headers.append(HttpHeaders.UserAgent, it) } + } } } + }.let { + checkResponses(it.awaitAll()) } - }.let { - checkResponses(it.awaitAll()) - } } } @@ -374,5 +374,5 @@ class RateLimitingPluginTest : FunSpec() { data class RateLimiterTestScope( val client: HttpClient, - val implementation: KClass + val implementation: KClass, ) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt index eda925cc..4b413332 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt @@ -22,7 +22,7 @@ import kotlinx.coroutines.launch */ public val TaskScheduling: ApplicationPlugin = createApplicationPlugin( name = "TaskScheduling", - createConfiguration = ::TaskSchedulingConfiguration + createConfiguration = ::TaskSchedulingConfiguration, ) { application.log.debug("Configuring TaskScheduler") @@ -48,7 +48,7 @@ public val TaskScheduling: ApplicationPlugin = crea } private fun PluginBuilder.initializeTaskManagers( - taskManagers: List> + taskManagers: List>, ) = taskManagers.mapNotNull { manager -> pluginConfig.tasks[manager.name]?.let { tasks -> manager to application.async { @@ -59,7 +59,7 @@ private fun PluginBuilder.initializeTaskManagers( }.toMap() private fun PluginBuilder.checkTaskMangerAssignments( - taskManagers: List> + taskManagers: List>, ) { with(pluginConfig.tasks.filter { it.key !in taskManagers.map { it.name } }) { require(isEmpty()) { @@ -90,7 +90,7 @@ private fun TaskManager<*>.startTask(task: Task) { application.launch( context = application.coroutineContext.apply { task.dispatcher?.let { this + it } ?: this - }.apply { this + CoroutineName(task.name) } + }.apply { this + CoroutineName(task.name) }, ) { task.kronSchedule.doInfinity { executionTime -> execute(task, executionTime) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt index 5114c0d0..f5734988 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingConfiguration.kt @@ -36,7 +36,7 @@ public open class TaskSchedulingConfiguration { /** * The configuration of the task */ - taskConfiguration: TaskConfiguration.() -> Unit + taskConfiguration: TaskConfiguration.() -> Unit, ) { tasks.getOrPut(taskManagerName.toTaskManagerName()) { mutableListOf() }.add( TaskConfiguration().apply(taskConfiguration).also { checkNotNull(it.kronSchedule) }.let { @@ -45,9 +45,9 @@ public open class TaskSchedulingConfiguration { dispatcher = it.dispatcher, concurrency = it.concurrency, kronSchedule = buildSchedule(it.kronSchedule ?: error("No kron schedule configured")), - task = it.task + task = it.task, ) - } + }, ) } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt index d954e472..ed14c3d4 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/TaskManager.kt @@ -51,7 +51,7 @@ public abstract class TaskManager : C public abstract suspend fun attemptExecute( task: Task, executionTime: DateTime, - concurrencyIndex: Int + concurrencyIndex: Int, ): TASK_EXECUTION_TOKEN? /** diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt index 7551aba5..79e916ff 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt @@ -12,7 +12,7 @@ public abstract class TaskLockManager : TaskManager Unit + public val task: suspend Application.(DateTime) -> Unit, ) { public fun concurrencyRange(): IntRange = 1..concurrency } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/build.gradle.kts index 4d8bde44..4d9c213e 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/build.gradle.kts @@ -6,7 +6,7 @@ plugins { } kotlin { - targetJvm() + targetJvm(project) sourceSets { jvmMainDependencies { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/taskscheduling/TaskSchedulingPluginTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt similarity index 51% rename from ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/taskscheduling/TaskSchedulingPluginTest.kt rename to ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt index 61b8dcd4..210a7284 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/taskscheduling/TaskSchedulingPluginTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt @@ -8,11 +8,11 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.core.spec.style.scopes.ContainerScope import io.kotest.datatest.withData import io.kotest.matchers.ints.shouldBeGreaterThan -import io.ktor.server.application.install +import io.ktor.server.application.log import io.ktor.server.config.MapApplicationConfig import io.ktor.server.config.mergeWith -import io.ktor.server.testing.TestApplicationEngine -import io.ktor.server.testing.createTestEnvironment +import io.ktor.server.testing.TestApplication +import io.ktor.server.testing.TestApplicationBuilder import korlibs.time.DateTime import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay @@ -21,6 +21,7 @@ import kotlinx.coroutines.launch import kotlin.math.pow import kotlin.time.Duration.Companion.milliseconds +@Suppress("UNUSED") val logger = KotlinLogging.logger { } private const val ENGINE_COUNT = 5 @@ -39,41 +40,52 @@ abstract class TaskSchedulingPluginTest : FunSpec() { executionBufferMs: Int = DEFAULT_EXECUTION_BUFFER_MS, frequenciesExponentialSeriesInitialMs: Short = FREQUENCIES_EXPONENTIAL_SERIES_INITIAL_MS, frequenciesExponentialSeriesN: Short = FREQUENCIES_EXPONENTIAL_SERIES_N, - taskSchedulingConfiguration: TaskSchedulingConfiguration.(TaskFreqMs) -> Unit + taskSchedulingConfiguration: TaskSchedulingConfiguration.(TaskFreqMs) -> Unit, ) { - fun kronTaskSchedule(taskFrequencyMs: Int): SchedulerBuilder.() -> Unit = { - milliseconds { - from(0) every taskFrequencyMs - this.last + fun kronTaskSchedule(taskFrequencyMs: Int): SchedulerBuilder.() -> Unit = + { + milliseconds { + from(0) every taskFrequencyMs + this.last + } } - } - val frequencies = exponentialScheduleGenerator( - initial = frequenciesExponentialSeriesInitialMs, - n = frequenciesExponentialSeriesN - ) + val frequencies = + exponentialScheduleGenerator( + initial = frequenciesExponentialSeriesInitialMs, + n = frequenciesExponentialSeriesN, + ) withData(nameFn = { "Freq: $it ms" }, frequencies) { freqMs -> withData(nameFn = { "Concurrency = $it" }, concurrencyValues) { concurrency -> coroutineScope { - val taskLogsAndEngines = setupApplicationEngines( - taskSchedulingConfiguration, - engineCount, - freqMs.toLong(), - concurrency.toShort(), - kronTaskSchedule(freqMs) - ).map { it to launch { it.second.start() } } - .also { it.map { engineAndJob -> engineAndJob.second }.joinAll() } - .map { it.first } + val taskLogsAndApplications = + setupApplicationEngines( + taskSchedulingConfiguration, + engineCount, + freqMs.toLong(), + concurrency.toShort(), + kronTaskSchedule(freqMs), + ).map { it to launch { it.second.start() } } + .also { it.map { engineAndJob -> engineAndJob.second }.joinAll() } + .map { it.first } delay((freqMs + executionBufferMs).milliseconds * executions) - taskLogsAndEngines.forEach { launch { it.second.stop(gracePeriodMillis = freqMs * 10L) } } + taskLogsAndApplications.forEach { launch { it.second.stop() } } try { - with(taskLogsAndEngines.map { it.first }.flatten()) { + with(taskLogsAndApplications.map { it.first }.flatten()) { size shouldBeGreaterThan executions - 2 with(groupingBy { it }.eachCount()) { val errors = - this.mapNotNull { if (it.value > concurrency) "${it.key.format2()} was executed ${it.value} times, expected no more than $concurrency" else null } + this.mapNotNull { + if (it.value > + concurrency + ) { + "${it.key.format2()} was executed ${it.value} times, expected no more than $concurrency" + } else { + null + } + } if (errors.isNotEmpty()) { fail(errors.joinToString("\n")) } @@ -93,38 +105,49 @@ abstract class TaskSchedulingPluginTest : FunSpec() { count: Int, freqMs: Long, concurrency: Short = 1, - kronTaskSchedule: SchedulerBuilder.() -> Unit + kronTaskSchedule: SchedulerBuilder.() -> Unit, ) = (1..count).map { ktorHost -> val executionRecords = mutableListOf() - executionRecords to TestApplicationEngine( - createTestEnvironment { + val block: TestApplicationBuilder.() -> Unit = { + environment { config = config.mergeWith(MapApplicationConfig("ktor.deployment.host" to ktorHost.toString())) - module { - install(TaskScheduling) { - taskSchedulingConfiguration(TaskFreqMs(freqMs)) + } + install(TaskScheduling) { + taskSchedulingConfiguration(TaskFreqMs(freqMs)) - task { - name = "Test Kron Task" - task = { taskExecutionTime -> - executionRecords.add(taskExecutionTime) - log.debug("Host: $ktorHost executing task at ${taskExecutionTime.format2()}") - } - kronSchedule = kronTaskSchedule - this.concurrency = concurrency.toInt() - } + task { + name = "Test Kron Task" + task = { taskExecutionTime -> + executionRecords.add(taskExecutionTime) + log.debug("Host: $ktorHost executing task at ${taskExecutionTime.format2()}") } + kronSchedule = kronTaskSchedule + this.concurrency = concurrency.toInt() } } - ) + } + executionRecords to + TestApplication { + engine { + shutdownGracePeriod = freqMs * 10 + } + block() + } } - private fun exponentialScheduleGenerator(initial: Short, n: Short): List { - val frequencies = (0.until(n)).map { - (initial.toDouble().times(if (it == 0) 1.0 else 2.0.pow(it.toDouble()))).toInt() - } + private fun exponentialScheduleGenerator( + initial: Short, + n: Short, + ): List { + val frequencies = + (0.until(n)).map { + (initial.toDouble().times(if (it == 0) 1.0 else 2.0.pow(it.toDouble()))).toInt() + } return frequencies } } @JvmInline -value class TaskFreqMs(val value: Long) +value class TaskFreqMs( + val value: Long, +) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts index 5f73e5f6..2212f926 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts @@ -8,7 +8,7 @@ plugins { kotlin { explicitApi() - targetJvm() + targetJvm(project) sourceSets { jvmMainDependencies { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt index f524995b..0e9d029b 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/JdbcLockManager.kt @@ -44,7 +44,7 @@ public class JdbcLockManager( /** * The task lock table to use, if not provided, the [DefaultTaskLockTable] will be used */ - private val taskLockTable: ExposedTaskLockTable = DefaultTaskLockTable + private val taskLockTable: ExposedTaskLockTable = DefaultTaskLockTable, ) : DatabaseTaskLockManager() { override suspend fun initTaskLockTable() { @@ -53,12 +53,12 @@ public class JdbcLockManager( override suspend fun insertTaskLock( task: Task, - taskConcurrencyIndex: Int + taskConcurrencyIndex: Int, ): Boolean = newSuspendedTransaction( application.coroutineContext, database, - transactionIsolation = Connection.TRANSACTION_READ_COMMITTED + transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, ) { repetitionAttempts = 0 debug = true @@ -72,11 +72,11 @@ public class JdbcLockManager( override suspend fun updateTaskLock( task: Task, concurrencyIndex: Int, - executionTime: DateTime + executionTime: DateTime, ): JdbcTaskLock? = newSuspendedTransaction( application.coroutineContext, db = database, - transactionIsolation = Connection.TRANSACTION_READ_COMMITTED + transactionIsolation = Connection.TRANSACTION_READ_COMMITTED, ) { val taskExecutionInstant = Instant.fromEpochMilliseconds(executionTime.unixMillisLong) taskLockTable.update( @@ -84,9 +84,9 @@ public class JdbcLockManager( selectClause( task, concurrencyIndex, - taskExecutionInstant + taskExecutionInstant, ) - } + }, ) { it[lockedAt] = taskExecutionInstant it[taskLockTable.concurrencyIndex] = concurrencyIndex @@ -96,7 +96,7 @@ public class JdbcLockManager( JdbcTaskLock( name = task.name, concurrencyIndex = concurrencyIndex, - lockedAt = executionTime + lockedAt = executionTime, ) } else { null @@ -110,7 +110,7 @@ public class JdbcLockManager( private fun selectClause( task: Task, concurrencyIndex: Int, - taskExecutionInstant: Instant + taskExecutionInstant: Instant, ) = (taskLockTable.name eq task.name and taskLockTable.concurrencyIndex.eq(concurrencyIndex)) and lockedAt.neq(LiteralOp(KotlinInstantColumnType(), taskExecutionInstant)) @@ -119,7 +119,7 @@ public class JdbcLockManager( public class JdbcTaskLock( override val name: String, override val concurrencyIndex: Int, - override val lockedAt: DateTime + override val lockedAt: DateTime, ) : DatabaseTaskLock { override fun toString(): String = "name=$name, concurrencyIndex=$concurrencyIndex, lockedAt=${lockedAt.format2()}}" @@ -146,7 +146,7 @@ public class JdbcJobLockManagerConfiguration : DatabaseTaskLockManagerConfigurat JdbcLockManager( name = name.toTaskManagerName(), application = application, - database = database + database = database, ) } @@ -161,12 +161,12 @@ public fun TaskSchedulingConfiguration.jdbc( * if none is provided, it will be considered the default one. only one default task manager is allowed. */ name: String? = null, - config: JdbcJobLockManagerConfiguration.() -> Unit + config: JdbcJobLockManagerConfiguration.() -> Unit, ) { this.addTaskManager( JdbcJobLockManagerConfiguration().apply { config() this.name = name - } + }, ) } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt index 34d87287..f49af507 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/src/jvmTest/kotlin/taskscheduling/JdbcLockManagerTest.kt @@ -29,7 +29,7 @@ class JdbcLockManagerTest : TaskSchedulingPluginTest() { url = postgresContainer.getJdbcUrl(), driver = "org.postgresql.Driver", user = postgresContainer.username, - password = postgresContainer.password + password = postgresContainer.password, ).also { transaction { SchemaUtils.create(DefaultTaskLockTable) } } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts index 051f7616..94dcc69f 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts @@ -8,7 +8,7 @@ plugins { kotlin { explicitApi() - targetJvm() + targetJvm(project) sourceSets { jvmMainDependencies { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt index 6e8e39a7..74ce976b 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt @@ -43,7 +43,7 @@ public class MongoDBLockManager( /** * The name of the database */ - databaseName: String + databaseName: String, ) : DatabaseTaskLockManager() { private val collection = client.getDatabase(databaseName) @@ -53,17 +53,17 @@ public class MongoDBLockManager( override suspend fun updateTaskLock( task: Task, concurrencyIndex: Int, - executionTime: DateTime + executionTime: DateTime, ): MongoDbTaskLock? { val query = Filters.and( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), - Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex) + Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex), ), - Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime) + Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime), ) val updates = Updates.combine( - Updates.set(MongoDbTaskLock::lockedAt.name, executionTime) + Updates.set(MongoDbTaskLock::lockedAt.name, executionTime), ) val options = FindOneAndUpdateOptions().upsert(false) client.startSession().use { session -> @@ -82,15 +82,15 @@ public class MongoDBLockManager( collection.createIndex( Indexes.compoundIndex( Indexes.text(MongoDbTaskLock::name.name), - Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name) + Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name), ), - IndexOptions().unique(true) + IndexOptions().unique(true), ) } override suspend fun insertTaskLock( task: Task, - taskConcurrencyIndex: Int + taskConcurrencyIndex: Int, ): Boolean { client.startSession().use { session -> session.startTransaction(transactionOptions = majorityJTransaction()) @@ -98,18 +98,18 @@ public class MongoDBLockManager( return collection.find( Filters.and( Filters.eq(MongoDbTaskLock::name.name, task.name), - Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex) - ) + Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex), + ), ).firstOrNull()?.let { false } ?: runCatching { collection.insertOne( MongoDbTaskLock( task.name, taskConcurrencyIndex, - DateTime.EPOCH + DateTime.EPOCH, ), options = InsertOneOptions().apply { comment("Initial task insertion") - } + }, ) true }.onFailure { @@ -138,7 +138,7 @@ public class MongoDBLockManager( public data class MongoDbTaskLock( override val name: String, override val concurrencyIndex: Int, - override var lockedAt: DateTime + override var lockedAt: DateTime, ) : DatabaseTaskLock { override fun toString(): String { return "MongoDbTaskLockKey(name=$name, concurrencyIndex=$concurrencyIndex, lockedAt=${lockedAt.format2()})" @@ -161,7 +161,7 @@ internal class DateTimeCodec : Codec { internal val codecRegistry = CodecRegistries.fromRegistries( CodecRegistries.fromCodecs(DateTimeCodec()), - MongoClientSettings.getDefaultCodecRegistry() + MongoClientSettings.getDefaultCodecRegistry(), ) @TaskSchedulingDsl @@ -173,7 +173,7 @@ public class MongoDBJobLockManagerConfiguration : DatabaseTaskLockManagerConfigu name = name.toTaskManagerName(), application = application, client = client, - databaseName = databaseName + databaseName = databaseName, ) } @@ -188,12 +188,12 @@ public fun TaskSchedulingConfiguration.mongoDb( * if none is provided, it will be considered the default one. only one default task manager is allowed. */ name: String? = null, - config: MongoDBJobLockManagerConfiguration.() -> Unit + config: MongoDBJobLockManagerConfiguration.() -> Unit, ) { this.addTaskManager( MongoDBJobLockManagerConfiguration().apply { config() this.name = name - } + }, ) } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts index 9ae2ce82..94daa597 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/build.gradle.kts @@ -10,7 +10,7 @@ plugins { kotlin { explicitApi() - targetJvm() + targetJvm(project) targetNative() sourceSets { commonMainDependencies { diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt index da23fe20..f34c8157 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt @@ -26,7 +26,7 @@ public class RedisLockManager( override val application: Application, private val connectionPool: RedisConnectionPool, private val lockExpirationMs: Long, - private val connectionAcquisitionTimeoutMs: Long + private val connectionAcquisitionTimeoutMs: Long, ) : TaskLockManager() { override suspend fun init(tasks: List) {} @@ -62,7 +62,7 @@ public value class RedisTaskLock internal constructor( /** * must be unique to a task execution, i.e `$name_$concurrencyIndex at $executionTime` */ - public val value: String + public val value: String, ) : TaskLock { public companion object { private const val DELIMITER = "-" @@ -115,7 +115,7 @@ public class RedisTaskLockManagerConfiguration( /** * The timeout for trying to get a connection to from the pool */ - public var connectionAcquisitionTimeoutMs: Long = 100 + public var connectionAcquisitionTimeoutMs: Long = 100, ) : TaskLockManagerConfiguration() { override fun createTaskManager(application: Application): RedisLockManager = RedisLockManager( @@ -126,10 +126,10 @@ public class RedisTaskLockManagerConfiguration( host = host, port = port, username = username, - password = password + password = password, ), lockExpirationMs = lockExpirationMs, - connectionAcquisitionTimeoutMs = connectionAcquisitionTimeoutMs + connectionAcquisitionTimeoutMs = connectionAcquisitionTimeoutMs, ) } @@ -144,12 +144,12 @@ public fun TaskSchedulingConfiguration.redis( * if none is provided, it will be considered the default one. only one default task manager is allowed. */ name: String? = null, - config: RedisTaskLockManagerConfiguration.() -> Unit + config: RedisTaskLockManagerConfiguration.() -> Unit, ) { this.addTaskManager( RedisTaskLockManagerConfiguration().apply { config() this.name = name - } + }, ) } From a1c907c4868c564074436ac60aabcfd28c762372 Mon Sep 17 00:00:00 2001 From: Ido Flax Date: Sun, 13 Oct 2024 21:52:03 +0200 Subject: [PATCH 2/4] feat(kotlin-v2-ktor-v3): upgraded to - kotlin 2.0.20 - ktor 3.0.0 - other libs upgraded accordingly - removed usage of context receivers Changes to main and test code accordingly --- .../server/plugins/kafka/KafkaPluginConfig.kt | 2 +- .../taskscheduling/TaskSchedulerPlugin.kt | 79 ++++++++++--------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt index 3bfa54da..77d518f7 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPluginConfig.kt @@ -194,7 +194,7 @@ internal fun KafkaConfigPropertiesContext.withDefaultConsumerConfig() = internal fun KafkaConfigPropertiesContext.withSchemaRegistryUrl() = apply { - kafkaProperties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, this.kafkaConfig.schemaRegistryUrl) + kafkaProperties[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = this.kafkaConfig.schemaRegistryUrl } @KafkaDsl diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt index 4b413332..7979624e 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt @@ -20,53 +20,53 @@ import kotlinx.coroutines.launch * The tasks are managed by some implementation of [TaskManager], that is responsible for coordinating the execution * of the tasks across the different instances of the application. */ -public val TaskScheduling: ApplicationPlugin = createApplicationPlugin( - name = "TaskScheduling", - createConfiguration = ::TaskSchedulingConfiguration, -) { - application.log.debug("Configuring TaskScheduler") +public val TaskScheduling: ApplicationPlugin = + createApplicationPlugin( + name = "TaskScheduling", + createConfiguration = ::TaskSchedulingConfiguration, + ) { + application.log.debug("Configuring TaskScheduler") - checkUniqueTaskNames() + checkUniqueTaskNames() - val taskManagers = createTaskManagers() + val taskManagers = createTaskManagers() - checkTaskMangerAssignments(taskManagers) + checkTaskMangerAssignments(taskManagers) - val taskManagerInits = initializeTaskManagers(taskManagers) + val taskManagerInits = initializeTaskManagers(taskManagers) - on(MonitoringEvent(ApplicationStarted)) { application -> - taskManagers.forEach { manager -> - taskManagerInits[manager]?.let { tasks -> - application.launch { - tasks.await().forEach { task -> - manager.startTask(task) + on(MonitoringEvent(ApplicationStarted)) { application -> + taskManagers.forEach { manager -> + taskManagerInits[manager]?.let { tasks -> + application.launch { + tasks.await().forEach { task -> + manager.startTask(task) + } } } } } } -} -private fun PluginBuilder.initializeTaskManagers( - taskManagers: List>, -) = taskManagers.mapNotNull { manager -> - pluginConfig.tasks[manager.name]?.let { tasks -> - manager to application.async { - manager.init(tasks) - tasks.toList() - } - } ?: error("Configuration verification did not check for missing task managers assigned to tasks, this is a bug") -}.toMap() +private fun PluginBuilder.initializeTaskManagers(taskManagers: List>) = + taskManagers + .mapNotNull { manager -> + pluginConfig.tasks[manager.name]?.let { tasks -> + manager to + application.async { + manager.init(tasks) + tasks.toList() + } + } ?: error("Configuration verification did not check for missing task managers assigned to tasks, this is a bug") + }.toMap() -private fun PluginBuilder.checkTaskMangerAssignments( - taskManagers: List>, -) { +private fun PluginBuilder.checkTaskMangerAssignments(taskManagers: List>) { with(pluginConfig.tasks.filter { it.key !in taskManagers.map { it.name } }) { require(isEmpty()) { "Bad configuration: The following tasks manager names were assigned tasks but were not created: ${ - this.toList().joinToString { - "${it.first}: ${it.second.joinToString() { task -> task.name }}}" - } + this.toList().joinToString { + "${it.first}: ${it.second.joinToString { task -> task.name }}}" + } }" } } @@ -79,7 +79,12 @@ private fun PluginBuilder.createTaskManagers(): Lis } private fun PluginBuilder.checkUniqueTaskNames() { - with(pluginConfig.tasks.values.flatten().groupingBy { it.name }.eachCount()) { + with( + pluginConfig.tasks.values + .flatten() + .groupingBy { it.name } + .eachCount(), + ) { require(all { it.value == 1 }) { "Bad configuration: Task names must be unique, but the following tasks names are repeated: ${this.keys.joinToString()}" } @@ -88,9 +93,11 @@ private fun PluginBuilder.checkUniqueTaskNames() { private fun TaskManager<*>.startTask(task: Task) { application.launch( - context = application.coroutineContext.apply { - task.dispatcher?.let { this + it } ?: this - }.apply { this + CoroutineName(task.name) }, + context = + application.coroutineContext + .apply { + task.dispatcher?.let { this + it } ?: this + }.apply { this + CoroutineName(task.name) }, ) { task.kronSchedule.doInfinity { executionTime -> execute(task, executionTime) From 931f6ab4ed56196c64ff0e1a150d9e5943ee6912 Mon Sep 17 00:00:00 2001 From: Ido Flax Date: Wed, 16 Oct 2024 13:08:15 +0200 Subject: [PATCH 3/4] feat(kotlin-v2-ktor-v3): Removed bundled dependencies (no more fat jars) --- buildSrc/build.gradle.kts | 7 +- .../io/github/flaxoos/ktor/Conventions.kt | 2 + .../flaxoos/ktor/extensions/Publishing.kt | 81 ++++++++++--------- .../github/flaxoos/ktor/extensions/Targets.kt | 24 +----- gradle.properties | 2 +- gradle/libs.versions.toml | 9 +-- ktor-server-kafka/build.gradle.kts | 2 +- .../ktor/server/plugins/kafka/KafkaPlugin.kt | 5 +- .../plugins/kafka/components/AdminClient.kt | 37 ++++++--- .../server/plugins/kafka/components/Avro.kt | 54 +++++++------ ktor-server-rate-limiting/build.gradle.kts | 6 -- .../taskscheduling/TaskSchedulerPlugin.kt | 7 +- .../build.gradle.kts | 9 +-- .../build.gradle.kts | 4 +- 14 files changed, 127 insertions(+), 122 deletions(-) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index a910ccf9..53bff2f3 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -6,7 +6,11 @@ plugins { } kotlin { - jvmToolchain(libs.versions.java.get().toInt()) + jvmToolchain( + libs.versions.java + .get() + .toInt(), + ) } repositories { @@ -34,7 +38,6 @@ dependencies { implementation(libs.kover.badge.gradlePlugin) implementation(libs.dokka.gradlePlugin) implementation(libs.detekt.gradlePlugin) - implementation(libs.shadow.gradlePlugin) implementation(libs.gradle.release.gradlePlugin) testImplementation(libs.kotest.runner.junit5) diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt index d15f502a..22dd820b 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/Conventions.kt @@ -30,6 +30,7 @@ import org.gradle.plugins.ide.idea.model.IdeaModel import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension import org.jetbrains.kotlin.gradle.plugin.KotlinDependencyHandler import org.jetbrains.kotlin.gradle.plugin.KotlinSourceSet +import org.jetbrains.kotlin.gradle.utils.named open class Conventions : Plugin { open fun KotlinMultiplatformExtension.conventionSpecifics(project: Project) {} @@ -51,6 +52,7 @@ open class Conventions : Plugin { apply(project.plugin("ktlint")) } repositories { + mavenLocal() mavenCentral() maven { url = uri("https://maven.pkg.github.com/flaxoos/flax-gradle-plugins") diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Publishing.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Publishing.kt index af0c333e..1dfdcac0 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Publishing.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Publishing.kt @@ -4,6 +4,7 @@ import org.gradle.api.Project import org.gradle.api.publish.PublishingExtension import org.gradle.api.publish.maven.MavenPublication import org.gradle.api.publish.maven.tasks.AbstractPublishToMaven +import org.gradle.api.tasks.TaskContainer import org.gradle.api.tasks.TaskProvider import org.gradle.jvm.tasks.Jar import org.gradle.kotlin.dsl.named @@ -15,19 +16,21 @@ import org.gradle.plugins.signing.SigningExtension import org.jetbrains.dokka.gradle.AbstractDokkaTask import java.util.Base64 +private const val DOKKA = "dokka" +private const val DOKKA_JAR_TASK_NAME = "${DOKKA}Jar" fun Project.configurePublishing() { - val dokkaJarTask = registerDokkaJarTask() + registerDokkaJarTask() configureSigning() - configureMavenPublications(dokkaJarTask) - + configureMavenPublications() } -fun Project.registerDokkaJarTask() = tasks.register("dokkaJar") { - val dokkaHtml = tasks.named("dokkaHtml") - archiveClassifier.set("javadoc") - from(dokkaHtml.get().outputDirectory) -} +fun Project.registerDokkaJarTask() = + tasks.register(DOKKA_JAR_TASK_NAME) { + val dokkaHtml = tasks.named("${DOKKA}Html") + archiveClassifier.set("javadoc") + from(dokkaHtml.get().outputDirectory) + } fun Project.configureSigning() { tasks.withType().configureEach { @@ -40,40 +43,46 @@ fun Project.configureSigning() { } } -fun Project.configureMavenPublications(dokkaJar: TaskProvider) { - the().apply { - publications.withType { - artifact(dokkaJar) - pom { - name.set("${rootProject.name}: ${project.name}") - groupId = project.group.toString() - version = project.version.toString() - url.set("https://github.com/Flaxoos/extra-ktor-plugins") - inceptionYear.set("2023") - description.set( - "This project provides a suite of feature-rich, efficient, and highly customizable " + "plugins for your Ktor Server or Client, crafted in Kotlin, available for multiplatform." - ) - scm { - connection.set("scm:git:https://github.com/Flaxoos/extra-ktor-plugins.git") - developerConnection.set("scm:git:https://github.com/Flaxoos/extra-ktor-plugins.git") +fun Project.configureMavenPublications() { + afterEvaluate { + the().apply { + publications.withType { + artifact(tasks.dokkaJar) + pom { + name.set("${rootProject.name}: ${project.name}") + groupId = project.group.toString() + version = project.version.toString() url.set("https://github.com/Flaxoos/extra-ktor-plugins") - } + inceptionYear.set("2023") + description.set( + "This project provides a suite of feature-rich, efficient, and highly customizable " + + "plugins for your Ktor Server or Client, crafted in Kotlin, available for multiplatform.", + ) + scm { + connection.set("scm:git:https://github.com/Flaxoos/extra-ktor-plugins.git") + developerConnection.set("scm:git:https://github.com/Flaxoos/extra-ktor-plugins.git") + url.set("https://github.com/Flaxoos/extra-ktor-plugins") + } - licenses { - license { - name.set("The Apache License, Version 2.0") - url.set("https://opensource.org/license/mit/") + licenses { + license { + name.set("The Apache License, Version 2.0") + url.set("https://opensource.org/license/mit/") + } } - } - developers { - developer { - id.set("flaxoos") - name.set("Ido Flax") - email.set("idoflax@gmail.com") + developers { + developer { + id.set("flaxoos") + name.set("Ido Flax") + email.set("idoflax@gmail.com") + } } } } } } -} \ No newline at end of file +} + +val TaskContainer.dokkaJar: TaskProvider + get() = named(DOKKA_JAR_TASK_NAME, Jar::class) diff --git a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt index 7e463107..d712d0d9 100644 --- a/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt +++ b/buildSrc/src/main/kotlin/io/github/flaxoos/ktor/extensions/Targets.kt @@ -1,10 +1,6 @@ package io.github.flaxoos.ktor.extensions import org.gradle.api.Project -import org.gradle.api.file.DuplicatesStrategy -import org.gradle.jvm.tasks.Jar -import org.gradle.kotlin.dsl.get -import org.gradle.kotlin.dsl.named import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTargetWithHostTests @@ -18,23 +14,5 @@ fun KotlinMultiplatformExtension.targetNative(configure: KotlinNativeTargetWithH fun KotlinMultiplatformExtension.targetJvm(project: Project) { jvmToolchain(project.versionOf("java").toInt()) - jvm { - project.tasks.named("jvmJar", Jar::class).configure { - duplicatesStrategy = DuplicatesStrategy.EXCLUDE - from( - listOf( - project.configurations["jvmCompileClasspath"], - project.configurations["jvmRuntimeClasspath"], - ).map { config -> - config.map { - if (it.isDirectory) { - it - } else { - project.zipTree(it) - } - } - }, - ) - } - } + jvm() } diff --git a/gradle.properties b/gradle.properties index 2b7b5a7a..94cabc30 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,7 +3,7 @@ github.repository.name=extra-ktor-plugins kotlin.native.cacheKind.linuxX64=none kotlin.native.ignoreDisabledTargets=true gradle.publish.enable.module-metadata=true -version=1.2.10 +version=2.0.0 gpr.user=flaxoos org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=2g kotlin.mpp.applyDefaultHierarchyTemplate=false \ No newline at end of file diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7d435a2d..3cbff731 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -2,7 +2,6 @@ # Core technologies kotlin = "2.0.20" java = "11" -shadow = "8.1.1" kotlinx-serialization = "1.7.3" kotlinx-io = "0.3.0" ksp = "1.9.10-1.0.13" @@ -58,7 +57,7 @@ avro = "1.11.3" # Redis kreds = "0.9.0" -redis-mp-client = "0.0.1" +redis-mp-client = "0.0.2" # Documentation dokka = "1.9.10" @@ -78,7 +77,6 @@ uuid = "0.8.1" kotlin-gradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } arrow-core = { module = "io.arrow-kt:arrow-core", version.ref = "arrow" } arrow-fx-coroutines = { module = "io.arrow-kt:arrow-fx-coroutines", version.ref = "arrow" } -shadow-gradlePlugin = { module = "com.github.johnrengelman:shadow", version.ref = "shadow" } kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinx-serialization" } kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serialization-json", version.ref = "kotlinx-serialization" } kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "kotlinx-coroutines" } @@ -108,6 +106,7 @@ ktor-client-mock = { module = "io.ktor:ktor-client-mock", version.ref = "ktor" } ktor-client-contentNegotiation = { module = "io.ktor:ktor-client-content-negotiation", version.ref = "ktor" } ktor-serialization-kotlinx-json = { module = "io.ktor:ktor-serialization-kotlinx-json", version.ref = "ktor" } +ktor-gradlePlugin = { module = "io.ktor.plugin:plugin", version.ref = "ktor" } # Database h2 = { module = "com.h2database:h2", version.ref = "h2" } @@ -189,8 +188,8 @@ kreds = { module = "io.github.crackthecodeabhi:kreds", version.ref = "kreds" } redis-mp-client = { module = "io.github.flaxoos:redis-client-multiplatform", version.ref = "redis-mp-client" } [plugins] -# Build -shadow = { id = "com.github.johnrengelman.shadow", version.ref = "shadow" } +# Ktor +ktor = { id = "io.ktor.plugin", version.ref = "ktor" } # Android android-library = { id = "com.android.library", version.ref = "android" } diff --git a/ktor-server-kafka/build.gradle.kts b/ktor-server-kafka/build.gradle.kts index 979c1fa2..aae41b4d 100644 --- a/ktor-server-kafka/build.gradle.kts +++ b/ktor-server-kafka/build.gradle.kts @@ -35,7 +35,7 @@ kotlin { } val jvmTest by getting { dependencies { - implementation(platform(libs.testcontainers.bom.get())) + implementation(dependencies.platform(libs.testcontainers.bom.get())) implementation(libs.kotest.extensions.testcontainers) implementation(libs.kotest.extensions.testcontainers.kafka) implementation(libs.testcontainers.kafka) diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt index 9d0eb5e3..d07603d3 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/KafkaPlugin.kt @@ -7,6 +7,7 @@ import io.github.flaxoos.ktor.server.plugins.kafka.Attributes.ConsumerAttributeK import io.github.flaxoos.ktor.server.plugins.kafka.Attributes.ProducerAttributeKey import io.github.flaxoos.ktor.server.plugins.kafka.Attributes.SchemaRegistryClientKey import io.github.flaxoos.ktor.server.plugins.kafka.Defaults.DEFAULT_CONFIG_PATH +import io.github.flaxoos.ktor.server.plugins.kafka.components.CoroutineScopedAdminClient.Companion.CoroutineScopedAdminClient import io.github.flaxoos.ktor.server.plugins.kafka.components.createConsumer import io.github.flaxoos.ktor.server.plugins.kafka.components.createKafkaAdminClient import io.github.flaxoos.ktor.server.plugins.kafka.components.createKafkaTopics @@ -132,7 +133,7 @@ private fun PluginBuilder.setupKafka(pluginConfig: pluginConfig.schemaRegistrationTimeoutMs, pluginConfig.schemaRegistryClientProvider, ) - with(application) { schemaRegistryClient.registerSchemas(pluginConfig.schemas) }.also { + with(application) { schemaRegistryClient.registerSchemas(this, pluginConfig.schemas) }.also { application.attributes.put(SchemaRegistryClientKey, schemaRegistryClient) } } @@ -146,7 +147,7 @@ private fun PluginBuilder.setupKafka(pluginConfig: application.attributes.put(AdminClientAttributeKey, it) application.log.info("Kafka admin setup finished") runBlocking(Dispatchers.IO) { - it.createKafkaTopics(topicBuilders = pluginConfig.topics) { + CoroutineScopedAdminClient(it).createKafkaTopics(topicBuilders = pluginConfig.topics) { application.log.info("Created Topics: $first") } } diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt index 563be4f5..4e2ad0e6 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/AdminClient.kt @@ -10,26 +10,41 @@ import org.apache.kafka.common.KafkaFuture internal fun Map.createKafkaAdminClient(): AdminClient = AdminClient.create(this) -context (CoroutineScope) -@Suppress("ForbiddenVoid", "RedundantSuspendModifier") -internal suspend fun AdminClient.createKafkaTopics( +internal class CoroutineScopedAdminClient( + val adminClient: AdminClient, + val scope: CoroutineScope, +) { + companion object { + fun CoroutineScope.CoroutineScopedAdminClient(adminClient: AdminClient) = CoroutineScopedAdminClient(adminClient, this) + } +} + +@Suppress("ForbiddenVoid") +internal suspend fun CoroutineScopedAdminClient.createKafkaTopics( topicBuilders: List, existingTopicHandler: (NewTopic) -> Unit = {}, topicCreationHandler: Pair>.() -> Unit, ) { - val existingTopics = listTopics().listings().get().map { it.name() } + val existingTopics = + adminClient + .listTopics() + .listings() + .get() + .map { it.name() } val createTopicsResult = topicBuilders.partition { it.name() in existingTopics }.let { (existing, new) -> existing.forEach { existingTopicHandler(it) } - createTopics(new) + adminClient.createTopics(new) } - createTopicsResult.values().map { result -> - launch(Dispatchers.IO) { - result.value.get() - topicCreationHandler(result.toPair()) - } - }.joinAll() + createTopicsResult + .values() + .map { result -> + scope.launch(Dispatchers.IO) { + result.value.get() + topicCreationHandler(result.toPair()) + } + }.joinAll() } diff --git a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt index fabbcc9d..f5bf3ef1 100644 --- a/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt +++ b/ktor-server-kafka/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/kafka/components/Avro.kt @@ -26,10 +26,17 @@ import org.apache.avro.generic.GenericRecord import kotlin.reflect.KClass import kotlin.reflect.typeOf -fun createSchemaRegistryClient(schemaRegistryUrl: String, timeoutMs: Long, clientProvider: () -> HttpClient) = - SchemaRegistryClient(clientProvider(), schemaRegistryUrl, timeoutMs) +fun createSchemaRegistryClient( + schemaRegistryUrl: String, + timeoutMs: Long, + clientProvider: () -> HttpClient, +) = SchemaRegistryClient(clientProvider(), schemaRegistryUrl, timeoutMs) -class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String, timeoutMs: Long) { +class SchemaRegistryClient( + providedClient: HttpClient, + schemaRegistryUrl: String, + timeoutMs: Long, +) { val client = providedClient.config { install(ContentNegotiation) { json() } @@ -41,12 +48,12 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String } } - context (Application) inline fun registerSchemas( + application: Application, schemas: MutableMap, TopicName>, ) { schemas.forEach { - registerSchema(it.key, it.value) + registerSchema(application, it.key, it.value) } } @@ -57,30 +64,31 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String * @param topicName the topic name to associate the schema with * @param onConflict the function to run if a schema with the same name already exists, defaults to do */ - context (Application) @OptIn(InternalSerializationApi::class) inline fun registerSchema( + application: Application, klass: KClass, topicName: TopicName, noinline onConflict: () -> Unit = {}, ) { val schema = Avro.default.schema(klass.serializer()) val payload = mapOf("schema" to schema.toString()) // Creating a map to form the payload - launch(Dispatchers.IO) { - client.post("subjects/${topicName.value}.${schema.name}/versions") { - contentType(ContentType.Application.Json) - setBody(payload) - }.let { - if (it.status == HttpStatusCode.Conflict) { - onConflict() + application.launch(Dispatchers.IO) { + client + .post("subjects/${topicName.value}.${schema.name}/versions") { + contentType(ContentType.Application.Json) + setBody(payload) + }.let { + if (it.status == HttpStatusCode.Conflict) { + onConflict() + } + if (!it.status.isSuccess()) { + application.log.error( + "Failed registering schema to schema registry at ${it.call.request.url}:\n${it.status} " + + "${it.bodyAsText()}:\nschema: $payload", + ) + } } - if (!it.status.isSuccess()) { - log.error( - "Failed registering schema to schema registry at ${it.call.request.url}:\n${it.status} " + - "${it.bodyAsText()}:\nschema: $payload", - ) - } - } } } } @@ -92,8 +100,7 @@ class SchemaRegistryClient(providedClient: HttpClient, schemaRegistryUrl: String * @return the resulting [T] must be annotated with [Serializable] * @throws [SerializationException] if serialization fails */ -inline fun fromRecord(record: GenericRecord): T = - Avro.default.fromRecord(serializer(typeOf()), record) as T +inline fun fromRecord(record: GenericRecord): T = Avro.default.fromRecord(serializer(typeOf()), record) as T /** * converts a [T] to a [GenericRecord] @@ -102,5 +109,4 @@ inline fun fromRecord(record: GenericRecord): T = * @return the resulting [GenericRecord] * @throws [SerializationException] if serialization fails */ -inline fun T.toRecord(): GenericRecord = - Avro.default.toRecord(serializer(), this) +inline fun T.toRecord(): GenericRecord = Avro.default.toRecord(serializer(), this) diff --git a/ktor-server-rate-limiting/build.gradle.kts b/ktor-server-rate-limiting/build.gradle.kts index 9271024b..cdd68fba 100644 --- a/ktor-server-rate-limiting/build.gradle.kts +++ b/ktor-server-rate-limiting/build.gradle.kts @@ -37,9 +37,3 @@ koverReport { } } } - -tasks.withType().configureEach { - kotlinOptions { - freeCompilerArgs += "-Xuse-ir" - } -} diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt index 7979624e..33f66e50 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulerPlugin.kt @@ -61,13 +61,12 @@ private fun PluginBuilder.initializeTaskManagers(ta }.toMap() private fun PluginBuilder.checkTaskMangerAssignments(taskManagers: List>) { - with(pluginConfig.tasks.filter { it.key !in taskManagers.map { it.name } }) { + with(pluginConfig.tasks.filter { it.key !in taskManagers.map { taskManager -> taskManager.name } }) { require(isEmpty()) { - "Bad configuration: The following tasks manager names were assigned tasks but were not created: ${ - this.toList().joinToString { + "Bad configuration: The following tasks manager names were assigned tasks but were not created: " + + toList().joinToString { "${it.first}: ${it.second.joinToString { task -> task.name }}}" } - }" } } } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts index 2212f926..ef8ea806 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-jdbc/build.gradle.kts @@ -12,11 +12,10 @@ kotlin { sourceSets { jvmMainDependencies { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) - implementation(libs.exposed.core) - implementation(libs.exposed.jdbc) - implementation(libs.exposed.dao) - implementation(libs.exposed.kotlin.datetime) - implementation(libs.krontab) + api(libs.exposed.core) + api(libs.exposed.jdbc) + api(libs.exposed.dao) + api(libs.exposed.kotlin.datetime) } jvmTestDependencies { implementation(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore.test) diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts index 94dcc69f..eac18589 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/build.gradle.kts @@ -12,8 +12,8 @@ kotlin { sourceSets { jvmMainDependencies { api(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore) - implementation(libs.mongodb.driver.kotlin.coroutine) - implementation(libs.mongodb.bson.kotlinx) + api(libs.mongodb.driver.kotlin.coroutine) + api(libs.mongodb.bson.kotlinx) } jvmTestDependencies { implementation(projects.ktorServerTaskScheduling.ktorServerTaskSchedulingCore.test) From 8088b91dd2a7cfba50962d19733625532e239cc4 Mon Sep 17 00:00:00 2001 From: Ido Flax Date: Wed, 16 Oct 2024 20:08:28 +0200 Subject: [PATCH 4/4] #54 mongo task issue (#63) - Fixed issue preventing multiple tasks on mongo --- gradle/libs.versions.toml | 2 +- .../managers/lock/TaskLockManager.kt | 10 +- .../lock/database/DatabaseTaskLockManager.kt | 28 +++--- .../TaskSchedulingPluginTest.kt | 84 +++++++++------- .../lock/database/MongoDBLockManager.kt | 98 ++++++++++--------- .../managers/lock/redis/RedisLockManager.kt | 40 ++++---- 6 files changed, 144 insertions(+), 118 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3cbff731..38cefe7d 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,7 +16,7 @@ android = "8.1.0" h2 = "2.2.224" postgres = "42.6.0" exposed = "0.44.0" -mongodb = "4.11.0" +mongodb = "5.2.0" # Logging logback = "1.4.11" diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt index 79e916ff..f6838e91 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/TaskLockManager.kt @@ -8,13 +8,11 @@ import io.github.flaxoos.ktor.server.plugins.taskscheduling.tasks.TaskLock import korlibs.time.DateTime public abstract class TaskLockManager : TaskManager() { - public override suspend fun attemptExecute( task: Task, executionTime: DateTime, concurrencyIndex: Int, - ): TASK_LOCK? = - acquireLockKey(task, executionTime, concurrencyIndex) + ): TASK_LOCK? = acquireLockKey(task, executionTime, concurrencyIndex) override suspend fun markExecuted(key: TASK_LOCK) { releaseLockKey(key) @@ -23,7 +21,11 @@ public abstract class TaskLockManager : TaskManager : - TaskLockManager() { +public abstract class DatabaseTaskLockManager : TaskLockManager() { final override suspend fun init(tasks: List) { logger.debug { "Initializing ${this::class.simpleName} for ${tasks.size} tasks" } initTaskLockTable() @@ -24,11 +23,12 @@ public abstract class DatabaseTaskLockManager : TaskLockManagerConfiguration() +public abstract class DatabaseTaskLockManagerConfiguration : + TaskLockManagerConfiguration() diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt index 210a7284..f0cc0090 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-core/test/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/TaskSchedulingPluginTest.kt @@ -30,6 +30,7 @@ private const val DEFAULT_EXECUTION_BUFFER_MS = 100 private const val FREQUENCIES_EXPONENTIAL_SERIES_INITIAL_MS = 200.toShort() private const val FREQUENCIES_EXPONENTIAL_SERIES_N = 3.toShort() private val concurrencyValues = listOf(1, 3, 6) +private val taskCounts = listOf(2, 4) abstract class TaskSchedulingPluginTest : FunSpec() { protected abstract suspend fun clean() @@ -56,44 +57,46 @@ abstract class TaskSchedulingPluginTest : FunSpec() { n = frequenciesExponentialSeriesN, ) withData(nameFn = { "Freq: $it ms" }, frequencies) { freqMs -> - withData(nameFn = { "Concurrency = $it" }, concurrencyValues) { concurrency -> - coroutineScope { - val taskLogsAndApplications = - setupApplicationEngines( - taskSchedulingConfiguration, - engineCount, - freqMs.toLong(), - concurrency.toShort(), - kronTaskSchedule(freqMs), - ).map { it to launch { it.second.start() } } - .also { it.map { engineAndJob -> engineAndJob.second }.joinAll() } - .map { it.first } + withData(nameFn = { "Task count = $it" }, taskCounts) { taskCount -> + withData(nameFn = { "Concurrency = $it" }, concurrencyValues) { concurrency -> + coroutineScope { + val taskLogsAndApplications = + setupApplicationEngines( + taskSchedulingConfiguration = taskSchedulingConfiguration, + count = engineCount, + freqMs = freqMs.toLong(), + taskCount = taskCount.toShort(), + concurrency = concurrency.toShort(), + kronTaskSchedule = kronTaskSchedule(freqMs), + ).map { it to launch { it.second.start() } } + .also { it.map { engineAndJob -> engineAndJob.second }.joinAll() } + .map { it.first } - delay((freqMs + executionBufferMs).milliseconds * executions) - taskLogsAndApplications.forEach { launch { it.second.stop() } } + delay((freqMs + executionBufferMs).milliseconds * executions) + taskLogsAndApplications.forEach { launch { it.second.stop() } } - try { - with(taskLogsAndApplications.map { it.first }.flatten()) { - size shouldBeGreaterThan executions - 2 - with(groupingBy { it }.eachCount()) { - val errors = - this.mapNotNull { - if (it.value > - concurrency - ) { - "${it.key.format2()} was executed ${it.value} times, expected no more than $concurrency" - } else { - null + try { + with(taskLogsAndApplications.map { it.first }.flatten()) { + size shouldBeGreaterThan executions - 2 + with(groupingBy { it }.eachCount()) { + val errors = + this.mapNotNull { + val expectedExecutions = concurrency * taskCount + if (it.value > expectedExecutions) { + "${it.key.format2()} was executed ${it.value} times, expected no more than $expectedExecutions times" + } else { + null + } } + if (errors.isNotEmpty()) { + fail(errors.joinToString("\n")) } - if (errors.isNotEmpty()) { - fail(errors.joinToString("\n")) } } + } finally { + delay(1000) + clean() } - } finally { - delay(1000) - clean() } } } @@ -104,6 +107,7 @@ abstract class TaskSchedulingPluginTest : FunSpec() { taskSchedulingConfiguration: TaskSchedulingConfiguration.(TaskFreqMs) -> Unit, count: Int, freqMs: Long, + taskCount: Short = 1, concurrency: Short = 1, kronTaskSchedule: SchedulerBuilder.() -> Unit, ) = (1..count).map { ktorHost -> @@ -115,14 +119,18 @@ abstract class TaskSchedulingPluginTest : FunSpec() { install(TaskScheduling) { taskSchedulingConfiguration(TaskFreqMs(freqMs)) - task { - name = "Test Kron Task" - task = { taskExecutionTime -> - executionRecords.add(taskExecutionTime) - log.debug("Host: $ktorHost executing task at ${taskExecutionTime.format2()}") + for (i in 1 until taskCount + 1) { + val taskName = "Test Kron Task: $i" + logger.info { "Adding task: $taskName" } + task { + name = taskName + task = { taskExecutionTime -> + executionRecords.add(taskExecutionTime) + log.info("Host: $ktorHost executing task $taskName at ${taskExecutionTime.format2()}") + } + kronSchedule = kronTaskSchedule + this.concurrency = concurrency.toInt() } - kronSchedule = kronTaskSchedule - this.concurrency = concurrency.toInt() } } } diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt index 74ce976b..6abd215c 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-mongodb/src/jvmMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/database/MongoDBLockManager.kt @@ -45,26 +45,29 @@ public class MongoDBLockManager( */ databaseName: String, ) : DatabaseTaskLockManager() { - - private val collection = client.getDatabase(databaseName) - .getCollection("TASK_LOCKS") - .withCodecRegistry(codecRegistry) + private val collection = + client + .getDatabase(databaseName) + .getCollection("TASK_LOCKS") + .withCodecRegistry(codecRegistry) override suspend fun updateTaskLock( task: Task, concurrencyIndex: Int, executionTime: DateTime, ): MongoDbTaskLock? { - val query = Filters.and( + val query = Filters.and( - Filters.eq(MongoDbTaskLock::name.name, task.name), - Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex), - ), - Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime), - ) - val updates = Updates.combine( - Updates.set(MongoDbTaskLock::lockedAt.name, executionTime), - ) + Filters.and( + Filters.eq(MongoDbTaskLock::name.name, task.name), + Filters.eq(MongoDbTaskLock::concurrencyIndex.name, concurrencyIndex), + ), + Filters.ne(MongoDbTaskLock::lockedAt.name, executionTime), + ) + val updates = + Updates.combine( + Updates.set(MongoDbTaskLock::lockedAt.name, executionTime), + ) val options = FindOneAndUpdateOptions().upsert(false) client.startSession().use { session -> session.startTransaction(transactionOptions = majorityJTransaction()) @@ -81,7 +84,7 @@ public class MongoDBLockManager( override suspend fun initTaskLockTable() { collection.createIndex( Indexes.compoundIndex( - Indexes.text(MongoDbTaskLock::name.name), + Indexes.ascending(MongoDbTaskLock::name.name), Indexes.ascending(MongoDbTaskLock::concurrencyIndex.name), ), IndexOptions().unique(true), @@ -95,21 +98,24 @@ public class MongoDBLockManager( client.startSession().use { session -> session.startTransaction(transactionOptions = majorityJTransaction()) - return collection.find( - Filters.and( - Filters.eq(MongoDbTaskLock::name.name, task.name), - Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex), - ), - ).firstOrNull()?.let { false } ?: runCatching { + return collection + .find( + Filters.and( + Filters.eq(MongoDbTaskLock::name.name, task.name), + Filters.eq(MongoDbTaskLock::concurrencyIndex.name, taskConcurrencyIndex), + ), + ).firstOrNull() + ?.let { false } ?: runCatching { collection.insertOne( MongoDbTaskLock( task.name, taskConcurrencyIndex, DateTime.EPOCH, ), - options = InsertOneOptions().apply { - comment("Initial task insertion") - }, + options = + InsertOneOptions().apply { + comment("Initial task insertion") + }, ) true }.onFailure { @@ -127,12 +133,14 @@ public class MongoDBLockManager( client.close() } - private fun majorityJTransaction(): TransactionOptions = TransactionOptions.builder() - .readConcern(ReadConcern.MAJORITY) - .readConcern(ReadConcern.LINEARIZABLE) - .writeConcern(WriteConcern.MAJORITY) - .writeConcern(WriteConcern.JOURNALED) - .build() + private fun majorityJTransaction(): TransactionOptions = + TransactionOptions + .builder() + .readConcern(ReadConcern.MAJORITY) + .readConcern(ReadConcern.LINEARIZABLE) + .writeConcern(WriteConcern.MAJORITY) + .writeConcern(WriteConcern.JOURNALED) + .build() } public data class MongoDbTaskLock( @@ -140,34 +148,37 @@ public data class MongoDbTaskLock( override val concurrencyIndex: Int, override var lockedAt: DateTime, ) : DatabaseTaskLock { - override fun toString(): String { - return "MongoDbTaskLockKey(name=$name, concurrencyIndex=$concurrencyIndex, lockedAt=${lockedAt.format2()})" - } + override fun toString(): String = "MongoDbTaskLockKey(name=$name, concurrencyIndex=$concurrencyIndex, lockedAt=${lockedAt.format2()})" } internal class DateTimeCodec : Codec { - override fun encode(writer: BsonWriter, value: DateTime?, encoderContext: EncoderContext) { + override fun encode( + writer: BsonWriter, + value: DateTime?, + encoderContext: EncoderContext, + ) { writer.writeString(value?.format2()) } - override fun getEncoderClass(): Class { - return DateTime::class.java - } + override fun getEncoderClass(): Class = DateTime::class.java - override fun decode(reader: BsonReader, decoderContext: DecoderContext): DateTime? { - return reader.readString()?.format2ToDateTime() - } + override fun decode( + reader: BsonReader, + decoderContext: DecoderContext, + ): DateTime? = reader.readString()?.format2ToDateTime() } -internal val codecRegistry = CodecRegistries.fromRegistries( - CodecRegistries.fromCodecs(DateTimeCodec()), - MongoClientSettings.getDefaultCodecRegistry(), -) +internal val codecRegistry = + CodecRegistries.fromRegistries( + CodecRegistries.fromCodecs(DateTimeCodec()), + MongoClientSettings.getDefaultCodecRegistry(), + ) @TaskSchedulingDsl public class MongoDBJobLockManagerConfiguration : DatabaseTaskLockManagerConfiguration() { public var client: MongoClient by Delegates.notNull() public var databaseName: String by Delegates.notNull() + override fun createTaskManager(application: Application): MongoDBLockManager = MongoDBLockManager( name = name.toTaskManagerName(), @@ -182,7 +193,6 @@ public class MongoDBJobLockManagerConfiguration : DatabaseTaskLockManagerConfigu */ @TaskSchedulingDsl public fun TaskSchedulingConfiguration.mongoDb( - /** * The name of the task manager, will be used to identify the task manager when assigning tasks to it * if none is provided, it will be considered the default one. only one default task manager is allowed. diff --git a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt index f34c8157..ae7ed2a2 100644 --- a/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt +++ b/ktor-server-task-scheduling/ktor-server-task-scheduling-redis/src/commonMain/kotlin/io/github/flaxoos/ktor/server/plugins/taskscheduling/managers/lock/redis/RedisLockManager.kt @@ -28,10 +28,13 @@ public class RedisLockManager( private val lockExpirationMs: Long, private val connectionAcquisitionTimeoutMs: Long, ) : TaskLockManager() { - override suspend fun init(tasks: List) {} - override suspend fun acquireLockKey(task: Task, executionTime: DateTime, concurrencyIndex: Int): RedisTaskLock? = + override suspend fun acquireLockKey( + task: Task, + executionTime: DateTime, + concurrencyIndex: Int, + ): RedisTaskLock? = connectionPool.withConnection(connectionAcquisitionTimeoutMs) { redisConnection -> logger.debug { "${application.host()}: ${executionTime.format2()}: Acquiring lock for ${task.name} - $concurrencyIndex" } val key = task.toRedisLockKey(executionTime, concurrencyIndex) @@ -41,7 +44,9 @@ public class RedisLockManager( } null } ?: run { - logger.debug { "${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex" } + logger.debug { + "${application.host()}: ${executionTime.format2()}: Failed to acquire lock for ${task.name} - $concurrencyIndex" + } null } @@ -66,8 +71,11 @@ public value class RedisTaskLock internal constructor( ) : TaskLock { public companion object { private const val DELIMITER = "-" - public fun Task.toRedisLockKey(executionTime: DateTime, concurrencyIndex: Int): RedisTaskLock = - RedisTaskLock("${name.replace(DELIMITER, "_")}-$concurrencyIndex at ${executionTime.format2()}") + + public fun Task.toRedisLockKey( + executionTime: DateTime, + concurrencyIndex: Int, + ): RedisTaskLock = RedisTaskLock("${name.replace(DELIMITER, "_")}-$concurrencyIndex at ${executionTime.format2()}") } override val name: String @@ -82,32 +90,26 @@ public class RedisTaskLockManagerConfiguration( * The redis host */ public var host: String = "undefined", - /** * The redis port */ public var port: Int = 0, - /** * The redis username */ public var username: String? = null, - /** * The redis password */ public var password: String? = null, - /** * For how long the lock should be valid, effectively, the pxMilliseconds for the setNx command */ public var lockExpirationMs: Long = 100, - /** * How many connections should the pool have initially */ public var connectionPoolInitialSize: Int = 10, - /** * The maximum number of connections in the pool */ @@ -121,13 +123,14 @@ public class RedisTaskLockManagerConfiguration( RedisLockManager( name = name.toTaskManagerName(), application = application, - connectionPool = RedisConnectionPool( - initialConnectionCount = connectionPoolInitialSize, - host = host, - port = port, - username = username, - password = password, - ), + connectionPool = + RedisConnectionPool( + initialConnectionCount = connectionPoolInitialSize, + host = host, + port = port, + username = username, + password = password, + ), lockExpirationMs = lockExpirationMs, connectionAcquisitionTimeoutMs = connectionAcquisitionTimeoutMs, ) @@ -138,7 +141,6 @@ public class RedisTaskLockManagerConfiguration( */ @TaskSchedulingDsl public fun TaskSchedulingConfiguration.redis( - /** * The name of the task manager, will be used to identify the task manager when assigning tasks to it * if none is provided, it will be considered the default one. only one default task manager is allowed.