Skip to content

Commit

Permalink
Support for assertion Key, Headers and Topic of a Kafka Message (#215)
Browse files Browse the repository at this point in the history
* enrich assertion of kafka with metadata information such as; topic, key, and headers

* modify test for metadataCondition

* encapsulate the incoming message with ObservedMessage

* add header test

* test is fixed

* rename example app classes
  • Loading branch information
osoykan authored Sep 26, 2023
1 parent cee4fff commit b6cb2ae
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import stove.spring.example.infrastructure.Headers
import stove.spring.example.infrastructure.http.SupplierHttpService
import stove.spring.example.infrastructure.messaging.kafka.KafkaOutgoingMessage
import stove.spring.example.infrastructure.messaging.kafka.KafkaProducer
import stove.spring.example.infrastructure.messaging.kafka.consumers.ProductCreateEvent
import stove.spring.example.infrastructure.messaging.kafka.consumers.CreateProductCommand
import java.util.Date

@Component
Expand All @@ -22,29 +22,29 @@ class ProductCreator(
) {
@Value("\${kafka.producer.product-created.topic-name}")
lateinit var productCreatedTopic: String
suspend fun createNewProduct(productCreateRequest: ProductCreateRequest): String {
val supplierPermission = supplierHttpService.getSupplierPermission(productCreateRequest.id)
suspend fun create(req: ProductCreateRequest): String {
val supplierPermission = supplierHttpService.getSupplierPermission(req.id)
if (!supplierPermission.isAllowed) {
return "Supplier with the given id(${productCreateRequest.supplierId}) is not allowed for product creation"
return "Supplier with the given id(${req.supplierId}) is not allowed for product creation"
}
val fromJson = JsonObject.fromJson(objectMapper.writeValueAsString(productCreateRequest))
val fromJson = JsonObject.fromJson(objectMapper.writeValueAsString(req))

collection.insert("product:${productCreateRequest.id}", fromJson).awaitFirst()
collection.insert("product:${req.id}", fromJson).awaitFirst()

kafkaProducer.send(
KafkaOutgoingMessage(
topic = productCreatedTopic,
key = productCreateRequest.id.toString(),
key = req.id.toString(),
headers = mapOf(Headers.EVENT_TYPE to ProductCreatedEvent::class.simpleName!!),
partition = 0,
payload = objectMapper.writeValueAsString(productCreateRequest.mapToProductCreatedEvent())
payload = objectMapper.writeValueAsString(req.mapToProductCreatedEvent())
)
)
return "OK"
}
}

fun ProductCreateEvent.mapToCreateRequest(): ProductCreateRequest {
fun CreateProductCommand.mapToCreateRequest(): ProductCreateRequest {
return ProductCreateRequest(this.id, this.name, this.supplierId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ class ProductController(private val productCreator: ProductCreator) {

@PostMapping("/product/create")
suspend fun createProduct(@RequestBody productCreateRequest: ProductCreateRequest): String {
return productCreator.createNewProduct(productCreateRequest)
return productCreator.create(productCreateRequest)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ class ProductTransferConsumers(
containerFactory = KafkaConsumerConfiguration.RETRY_LISTENER_BEAN_NAME
)
fun listen(cr: ConsumerRecord<String, String>) = runBlocking(MDCContext()) {
logger.info("Received product transfer event ${cr.value()}")
val productCreateEvent = objectMapper.readValue(
logger.info("Received product transfer command ${cr.value()}")
val command = objectMapper.readValue(
cr.value(),
ProductCreateEvent::class.java
CreateProductCommand::class.java
)
productCreator.createNewProduct(productCreateEvent.mapToCreateRequest())
productCreator.create(command.mapToCreateRequest())
}
}

data class ProductCreateEvent(
data class CreateProductCommand(
val id: Long,
val name: String,
val supplierId: Long
Expand Down
5 changes: 4 additions & 1 deletion examples/spring-example/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
spring:
application:
name: "stove"
servlet:
multipart:
max-request-size: 10MB

server:
port: 8001
http2:
enabled: true
enabled: false

couchbase:
hosts: localhost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import stove.spring.example.application.handlers.ProductCreatedEvent
import stove.spring.example.application.services.SupplierPermission
import stove.spring.example.infrastructure.couchbase.CouchbaseProperties
import stove.spring.example.infrastructure.messaging.kafka.consumers.BusinessException
import stove.spring.example.infrastructure.messaging.kafka.consumers.ProductCreateEvent
import stove.spring.example.infrastructure.messaging.kafka.consumers.CreateProductCommand

class ExampleTest : FunSpec({
test("bridge should work") {
Expand Down Expand Up @@ -55,13 +55,13 @@ class ExampleTest : FunSpec({
}

http {
postAndExpectJson<String>(uri = "/api/product/create", body = productCreateRequest.some()) { actual ->
actual shouldBe "OK"
postAndExpectBodilessResponse(uri = "/api/product/create", body = productCreateRequest.some()) { actual ->
actual.status shouldBe 200
}
}

kafka {
shouldBePublished<ProductCreatedEvent> { actual ->
shouldBePublished<ProductCreatedEvent> {
actual.id == productCreateRequest.id &&
actual.name == productCreateRequest.name &&
actual.supplierId == productCreateRequest.supplierId
Expand Down Expand Up @@ -99,7 +99,7 @@ class ExampleTest : FunSpec({

test("should throw error when send product create event for the not allowed supplier") {
TestSystem.validate {
val productCreateEvent = ProductCreateEvent(3L, name = "product name", 97L)
val productCreateEvent = CreateProductCommand(3L, name = "product name", 97L)
val supplierPermission = SupplierPermission(productCreateEvent.supplierId, isAllowed = false)

wiremock {
Expand All @@ -112,7 +112,7 @@ class ExampleTest : FunSpec({

kafka {
publish("trendyol.stove.service.product.create.0", productCreateEvent)
shouldBeConsumed<ProductCreateEvent> { actual ->
shouldBeConsumed<CreateProductCommand> {
actual.id == productCreateEvent.id
}
}
Expand All @@ -121,34 +121,32 @@ class ExampleTest : FunSpec({

test("should create new product when send product create event for the allowed supplier") {
TestSystem.validate {
val productCreateEvent = ProductCreateEvent(4L, name = "product name", 96L)
val supplierPermission = SupplierPermission(productCreateEvent.supplierId, isAllowed = true)
val createProductCommand = CreateProductCommand(4L, name = "product name", 96L)
val supplierPermission = SupplierPermission(createProductCommand.supplierId, isAllowed = true)

wiremock {
mockGet(
"/suppliers/${productCreateEvent.id}/allowed",
"/suppliers/${createProductCommand.id}/allowed",
statusCode = 200,
responseBody = supplierPermission.some()
)
}

kafka {
publish("trendyol.stove.service.product.create.0", productCreateEvent)
shouldBeConsumed<ProductCreateEvent> { actual ->
actual.id == productCreateEvent.id
}
shouldBePublished<ProductCreatedEvent> { actual ->
actual.id == productCreateEvent.id &&
actual.name == productCreateEvent.name &&
actual.supplierId == productCreateEvent.supplierId
publish("trendyol.stove.service.product.create.0", createProductCommand)
shouldBePublished<ProductCreatedEvent> {
actual.id == createProductCommand.id &&
actual.name == createProductCommand.name &&
actual.supplierId == createProductCommand.supplierId &&
metadata.headers["X-UserEmail"] == "[email protected]"
}
}

couchbase {
shouldGet<ProductCreateRequest>("product:${productCreateEvent.id}") { actual ->
actual.id shouldBe productCreateEvent.id
actual.name shouldBe productCreateEvent.name
actual.supplierId shouldBe productCreateEvent.supplierId
shouldGet<ProductCreateRequest>("product:${createProductCommand.id}") { actual ->
actual.id shouldBe createProductCommand.id
actual.name shouldBe createProductCommand.name
actual.supplierId shouldBe createProductCommand.supplierId
}
}
}
Expand All @@ -159,10 +157,13 @@ class ExampleTest : FunSpec({
TestSystem.validate {
kafka {
publish("trendyol.stove.service.product.failing.0", FailingEvent(5L))
shouldBeFailed<FailingEvent> { actual, exception ->
actual.id == 5L && exception is BusinessException
shouldBeFailed<FailingEvent> {
actual.id == 5L && reason is BusinessException
}

shouldBeFailed<FailingEvent> {
actual == FailingEvent(5L) && reason is BusinessException
}
shouldBeFailed(message = FailingEvent(5L), exception = BusinessException("Failing product create event"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.stove.spring.example.e2e

import com.fasterxml.jackson.databind.ObjectMapper
import com.trendyol.stove.testing.e2e.BaseApplicationContextInitializer
import com.trendyol.stove.testing.e2e.kafka.TestSystemKafkaInterceptor
import org.springframework.boot.SpringApplication
Expand All @@ -12,5 +11,4 @@ fun SpringApplication.addTestSystemDependencies() {
class TestSystemInitializer :
BaseApplicationContextInitializer({
bean<TestSystemKafkaInterceptor>(isPrimary = true)
bean<ObjectMapper> { ref("objectMapper") }
})
18 changes: 18 additions & 0 deletions examples/spring-example/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<configuration>

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} %msg%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
<logger name="org.testcontainers" level="WARN"/>
<logger name="com.couchbase" level="OFF"/>"
<logger name="tc" level="OFF"/>
<logger name="com.github.dockerjava" level="OFF"/>
<logger name="com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire" level="OFF"/>
<logger name="org.apache" level="OFF"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.trendyol.stove.testing.e2e.kafka

import arrow.core.None
import arrow.core.Option
import arrow.core.Some
import arrow.core.getOrElse
import com.trendyol.stove.functional.Try
import com.trendyol.stove.functional.recover
Expand All @@ -21,6 +20,15 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.getBean
import org.springframework.context.ApplicationContext
import org.springframework.kafka.core.KafkaTemplate
import kotlin.collections.List
import kotlin.collections.Map
import kotlin.collections.MutableMap
import kotlin.collections.listOf
import kotlin.collections.map
import kotlin.collections.mapOf
import kotlin.collections.plus
import kotlin.collections.set
import kotlin.collections.toMutableMap
import kotlin.reflect.KClass
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -85,95 +93,60 @@ class KafkaSystem(
return kafkaTemplate.usingCompletableFuture().send(record).await().let { this }
}

suspend fun shouldBeConsumed(
atLeastIn: Duration = 5.seconds,
message: Any
): KafkaSystem = coroutineScope {
shouldBeConsumedInternal(message::class, atLeastIn) { incomingMessage -> incomingMessage == Some(message) }
}.let { this }

suspend inline fun <reified T : Any> shouldBeConsumed(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T) -> Boolean
): KafkaSystem = coroutineScope {
shouldBeConsumedInternal(T::class, atLeastIn) { incomingMessage -> incomingMessage.isSome { o -> condition(o) } }
}.let { this }

@Deprecated("Use shouldBeConsumed instead", ReplaceWith("shouldBeConsumed<T> { actual -> actual == expected }"))
suspend inline fun <reified T : Any> shouldBeConsumedOnCondition(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T) -> Boolean
crossinline condition: ObservedMessage<T>.() -> Boolean
): KafkaSystem = coroutineScope {
shouldBeConsumedInternal(T::class, atLeastIn) { incomingMessage -> incomingMessage.isSome { o -> condition(o) } }
}.let { this }

suspend fun shouldBeFailed(
atLeastIn: Duration = 5.seconds,
message: Any,
exception: Throwable
): KafkaSystem = coroutineScope {
shouldBeFailedInternal(message::class, atLeastIn) { option, throwable -> option == Some(message) && throwable == exception }
shouldBeConsumedInternal(T::class, atLeastIn) { parsed ->
parsed.message.isSome { o -> condition(ObservedMessage(o, parsed.metadata)) }
}
}.let { this }

suspend inline fun <reified T : Any> shouldBeFailed(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T, Throwable) -> Boolean
crossinline condition: FailedObservedMessage<T>.() -> Boolean
): KafkaSystem = coroutineScope {
shouldBeFailedInternal(T::class, atLeastIn) { message, throwable -> message.isSome { m -> condition(m, throwable) } }
}.let { this }

@Deprecated(
"Use shouldBeFailed instead",
ReplaceWith("shouldBeFailed<T> { actual, throwable -> actual == expected && throwable == exception }")
)
suspend inline fun <reified T : Any> shouldBeFailedOnCondition(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T, Throwable) -> Boolean
): KafkaSystem = coroutineScope {
shouldBeFailedInternal(T::class, atLeastIn) { message, throwable -> message.isSome { m -> condition(m, throwable) } }
}.let { this }

suspend fun shouldBePublished(
atLeastIn: Duration = 5.seconds,
message: Any
): KafkaSystem = coroutineScope {
shouldBePublishedInternal(message::class, atLeastIn) { incomingMessage -> incomingMessage == Some(message) }
shouldBeFailedInternal(T::class, atLeastIn) { parsed ->
parsed.message.message.isSome { o ->
condition(
FailedObservedMessage(
o,
parsed.message.metadata,
parsed.reason
)
)
}
}
}.let { this }

suspend inline fun <reified T : Any> shouldBePublished(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T) -> Boolean
crossinline condition: ObservedMessage<T>.() -> Boolean
): KafkaSystem = coroutineScope {
shouldBePublishedInternal(T::class, atLeastIn) { incomingMessage -> incomingMessage.isSome { o -> condition(o) } }
}.let { this }

@Deprecated("Use shouldBePublished instead", ReplaceWith("shouldBePublished<T> { actual -> actual == expected }"))
suspend inline fun <reified T : Any> shouldBePublishedOnCondition(
atLeastIn: Duration = 5.seconds,
crossinline condition: (T) -> Boolean
): KafkaSystem = coroutineScope {
shouldBePublishedInternal(T::class, atLeastIn) { incomingMessage -> incomingMessage.isSome { o -> condition(o) } }
shouldBePublishedInternal(T::class, atLeastIn) { parsed ->
parsed.message.isSome { o -> condition(ObservedMessage(o, parsed.metadata)) }
}
}.let { this }

@PublishedApi
internal suspend fun <T : Any> shouldBeConsumedInternal(
clazz: KClass<T>,
atLeastIn: Duration,
condition: (Option<T>) -> Boolean
condition: (message: ParsedMessage<T>) -> Boolean
): Unit = coroutineScope { getInterceptor().waitUntilConsumed(atLeastIn, clazz, condition) }

@PublishedApi
internal suspend fun <T : Any> shouldBeFailedInternal(
clazz: KClass<T>,
atLeastIn: Duration,
condition: (Option<T>, Throwable) -> Boolean
condition: (message: FailedParsedMessage<T>) -> Boolean
): Unit = coroutineScope { getInterceptor().waitUntilFailed(atLeastIn, clazz, condition) }

@PublishedApi
internal suspend fun <T : Any> shouldBePublishedInternal(
clazz: KClass<T>,
atLeastIn: Duration,
condition: (Option<T>) -> Boolean
condition: (message: ParsedMessage<T>) -> Boolean
): Unit = coroutineScope { getInterceptor().waitUntilPublished(atLeastIn, clazz, condition) }
}

Expand Down
Loading

0 comments on commit b6cb2ae

Please sign in to comment.