Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
osoykan committed May 1, 2024
1 parent 8093842 commit 903292d
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ internal interface MessageSinkOps : MessageSinkPublishOps, CommonOps {
}

failedFunc.waitUntilCount(atLeastIn, times)
throwIfFailed(clazz, condition)
}

override fun dumpMessages(): String = "Sink so far:\n$store"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {

@ExperimentalKotest
class ProjectConfig : AbstractProjectConfig() {
init {
stoveKafkaBridgePortDefault = "50052"
}

override fun extensions(): List<Extension> = listOf(
SystemEnvironmentProjectListener(STOVE_KAFKA_BRIDGE_PORT, stoveKafkaBridgePortDefault)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,11 @@ abstract class StoveListener(
publisher.publishScope {
offer(
ProducerRecord(
// topic =
topicDefinition.retryTopic,
// key =
message.partition(),
message.key(),
// value =
message.value()
message.value(),
message.headers()
)
)
}
Expand All @@ -70,12 +69,11 @@ abstract class StoveListener(
} else {
logger.error("CONSUMER GOT an ERROR, retry limit exceeded: $message")
val record = ProducerRecord<String, Any>(
// topic =
topicDefinition.deadLetterTopic,
// key =
message.partition(),
message.key(),
// value =
message.value()
message.value(),
message.headers()
).apply {
headers().add("doNotFail", "true".toByteArray())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class ProductConsumer(
override val topicDefinition: TopicDefinition = TopicDefinition("product", "product.retry", "product.error")

override suspend fun listen(record: ConsumerRecord<String, String>) {
logger.info("Product consumed: ${record.value()}")
logger.info("Product consumed: ${record.value()} from topic: ${record.topic()} with key: ${record.key()}")
}
}

0 comments on commit 903292d

Please sign in to comment.