Skip to content

Commit

Permalink
add docs for kafka and arrange tests
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 1, 2024
1 parent 8dba795 commit d3d615c
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 41 deletions.
69 changes: 69 additions & 0 deletions docs/how-to-write-tests/1.Application-Aware/2.Ktor/index.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,72 @@
# 2. Ktor

[Here](https://github.com/Trendyol/stove/tree/main/examples/ktor-example) you can jump immediately to the Ktor example application.

## Deps

=== "Gradle"

```kotlin
dependencies {
testImplementation("com.trendyol:stove-ktor-testing-e2e:$version")
}
```

=== "Maven"

```xml
<dependency>
<groupId>com.trendyol</groupId>
<artifactId>stove-ktor-testing-e2e</artifactId>
<version>${stove-version}</version>
</dependency>
```

## Example Setup

```kotlin
TestSystem(baseUrl = "http://localhost:8080")
.with {
httpClient()
bridge()
postgresql {
PostgresqlOptions(configureExposedConfiguration = { cfg ->
listOf(
"database.jdbcUrl=${cfg.jdbcUrl}",
"database.host=${cfg.host}",
"database.port=${cfg.port}",
"database.name=${cfg.database}",
"database.username=${cfg.username}",
"database.password=${cfg.password}"
)
})
}
kafka {
stoveKafkaObjectMapperRef = objectMapperRef
KafkaSystemOptions {
listOf(
"kafka.bootstrapServers=${it.bootstrapServers}"
)
}
}
wiremock {
WireMockSystemOptions(
port = 9090,
removeStubAfterRequestMatched = true,
afterRequest = { e, _ ->
logger.info(e.request.toString())
}
)
}
ktor(
withParameters = listOf(
"port=8080"
),
runner = { parameters ->
stove.ktor.example.run(parameters) {
addTestSystemDependencies()
}
}
)
}.run()
```
73 changes: 73 additions & 0 deletions docs/how-to-write-tests/3.Components/02-kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Kafka

## Prerequisites

### 1. Docker Image

```shell
docker buildx imagetools create confluentinc/cp-kafka:latest --tag YOUR_REGISTRY/confluentinc/cp-kafka:latest
```

### 2. Library

=== "Gradle"

``` kotlin
dependencies {
testImplementation("com.trendyol:stove-testing-e2e-kafka:$version")
}
```

=== "Maven"

```xml
<dependency>
<groupId>com.trendyol</groupId>
<artifactId>stove-testing-e2e-kafka</artifactId>
<version>${stove-version}</version>
</dependency>
```

## Configure

```kotlin
TestSystem(baseUrl = "http://localhost:8080")
.with {
// ... other deps ...
bridge()
kafka {
stoveKafkaObjectMapperRef = objectMapperRef
KafkaSystemOptions {
listOf(
"kafka.bootstrapServers=${it.bootstrapServers}",
"kafka.interceptorClasses=com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge"
)
}
}
}.run()

```

### Configuring Object Mapper

Like every `SystemOptions` object, `KafkaSystemOptions` has a `stoveKafkaObjectMapperRef` field. You can set your own
object mapper to this field. If you don't set it, Stove will use its default object mapper.

```kotlin
var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default
```

### Kafka Bridge With Your Application

Stove Kafka bridge is a **MUST** to work with Kafka. Otherwise you can't assert any messages from your application.

As you can see in the example above, you need to add a support to your application to work with interceptor that Stove provides.

```kotlin
"kafka.interceptorClasses=com.trendyol.stove.testing.e2e.standalone.kafka.intercepting.StoveKafkaBridge"
```

!!! Important
`kafka.` prefix is an assumption that you can change it with your own prefix.

Make sure that `StoveKafkaBridge` is in your classpath.
31 changes: 0 additions & 31 deletions docs/how-to-write-tests/3.Dependencies/02-kafka.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import org.koin.core.module.Module
import org.koin.dsl.module
import stove.ktor.example.application.ExampleAppConsumer
import java.util.*
import kotlin.time.Duration.Companion.milliseconds

fun kafka(): Module = module {
single { createReceiver<Any>(get()) }
Expand All @@ -25,14 +24,13 @@ private fun <V : Any> createReceiver(config: AppConfiguration): KafkaReceiver<St
ExampleAppKafkaValueDeserializer<V>(),
config.kafka.groupId,
autoOffsetReset = AutoOffsetReset.Earliest,
commitStrategy = CommitStrategy.ByTime(300.milliseconds),
properties = Properties().apply {
put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, config.kafka.interceptorClasses)
put(ConsumerConfig.CLIENT_ID_CONFIG, config.kafka.clientId)
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "500")
put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, true)
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000")
put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000")
}
)
return KafkaReceiver(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

var stoveKafkaObjectMapperRef: ObjectMapper = StoveObjectMapper.Default
var stoveKafkaBridgePortDefault = "50051"
const val STOVE_KAFKA_BRIDGE_PORT = "STOVE_KAFKA_BRIDGE_PORT"
const val STOVE_KAFKA_BRIDGE_PORT_DEFAULT = "50051"

@StoveDsl
class KafkaSystem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.trendyol.stove.testing.e2e.system.abstractions.*

class KafkaSystemOptions(
val topicSuffixes: TopicSuffixes = TopicSuffixes(error = listOf(".error"), retry = listOf(".retry")),
val bridgeGrpcServerPort: Int = STOVE_KAFKA_BRIDGE_PORT_DEFAULT.toInt(),
val bridgeGrpcServerPort: Int = stoveKafkaBridgePortDefault.toInt(),
val objectMapper: ObjectMapper = stoveKafkaObjectMapperRef,
val containerOptions: KafkaContainerOptions = KafkaContainerOptions(),
override val configureExposedConfiguration: (KafkaExposedConfiguration) -> List<String> = { _ -> listOf() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class StoveKafkaBridge<K, V> : ConsumerInterceptor<K, V>, ProducerInterceptor<K,
}

private fun startGrpcClient(): StoveKafkaObserverServiceClient {
val onPort = System.getenv(STOVE_KAFKA_BRIDGE_PORT) ?: STOVE_KAFKA_BRIDGE_PORT_DEFAULT
val onPort = System.getenv(STOVE_KAFKA_BRIDGE_PORT) ?: stoveKafkaBridgePortDefault
logger.info("Connecting to Stove Kafka Bridge on port $onPort")
return Try { createClient(onPort) }
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {
private suspend fun startConsumers(bootStrapServers: String) {
val consumerSettings = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG to "500",
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "1000",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG to "true",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StoveKafkaValueDeserializer::class.java,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
Expand Down Expand Up @@ -69,7 +72,7 @@ class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {
@ExperimentalKotest
class ProjectConfig : AbstractProjectConfig() {
override fun extensions(): List<Extension> = listOf(
SystemEnvironmentProjectListener(STOVE_KAFKA_BRIDGE_PORT, STOVE_KAFKA_BRIDGE_PORT_DEFAULT)
SystemEnvironmentProjectListener(STOVE_KAFKA_BRIDGE_PORT, stoveKafkaBridgePortDefault)
)

override suspend fun beforeProject(): Unit = TestSystem()
Expand Down

0 comments on commit d3d615c

Please sign in to comment.