-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
withCreateConsumer and withCreateProducter methods have been removed #1085
Comments
Have you tried with |
I'm happy to work on this issue if someone can tell me what the design should be. |
Sorry for the slow response to this - good question, I don't think we really thought about this when making the changes you've linked to. I don't have all the context on this currently in my head but it looks like you'd need to override the default instances of |
I'll work on something and then we can discuss in a PR. |
Umm ... the build doesn't work out of the box due to this: ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
throw new IllegalStateException("No tagged version found")
) Any suggestions? |
That's surprising, I haven't seen that problem - I'll try checking out a fresh copy of the repo later and see if I can reproduce this |
In the meantime you can probably just type a value in to be able to start hacking? |
👍 I've made a bit of progress. Perhaps you can take a look and tell me if this is keeping with what you're thinking of: https://github.com/noelwelsh/fs2-kafka/tree/feature/public-mk-consumer I'm having a lot of trouble testing this code. The basic problem seems to be that the it("should use the MkConsumer instance in scope") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)
implicit val mockMkConsumer = MkConsumer.fromKafkaByteConsumer[IO](mockConsumer)
val consumed =
KafkaConsumer[IO]
.stream(consumerSettings[IO])
.subscribeTo(topic)
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.evalTap(
_ =>
IO.delay {
// mockConsumer.assign(java.util.Collections.singleton(new TopicPartition(topic, 0)))
// val partitionInfo = mockConsumer.partitionsFor(topic).get(0)
val record = new clients.consumer.ConsumerRecord(
topic,
0,
0L,
"Hello".getBytes(),
"Kafka".getBytes()
)
mockConsumer.addRecord(record)
}
)
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds)
.compile
.toVector
.unsafeRunSync()
consumed.size shouldEqual 1
consumed(0) shouldEqual "Hello" -> "Kafka"
}
} |
I've tried playing around with creating custom The settings don't encapsulate all the settings any more. Previously the
|
@noelwelsh that's a fair point about the ergonomics (though the traits are intended as capability traits rather than type classes so I don't see non-uniqueness as an argument against them in itself). The motivation for introducing the MkX traits was to decouple the effect type of consumer/producer creation from the effect type of the consumer/producer itself - see #588. One other way I can see to do this is reintroduce |
As another option, would it help if there were an option to explicitly pass a |
I'm unable to reproduce this when cloning the repo in the normal way: git clone [email protected]:fd4s/fs2-kafka.git
cd fs2-kafka
sbt compile Is there anything unusual about your development environment that might cause the tags to be missing? |
Sorry about the delay. I don't get time to work on this most of the week. Summary: I think the best solution is to add a method to pass a Here's my reasoning:
So the best way forward seems:
I don't think there is much value in keeping |
How does one use Kafka's
MockConsumer
andMockProducer
with fs2-kafka?The documentation (e.g. for ConsumerSettings) mentions
withCreateConsumer
andwithCreateProducer
but these methods have been removed. This commit removed thewtihCreateConsumer
method fromConsumerSettings
, and this commit removed thewithCreateProducer
method fromProducerSettings
.What is the replacement method? Thanks!
The text was updated successfully, but these errors were encountered: