I’ve been working on a migration from Spring 2.X to 3.X and my old code used @EnableBinding(Sink::class) and @StreamListener(Sink.INPUT) in the consumer. With both being deprecated and removed from the consumer the following error occurs when my Kafka Tests using the consumer are run:
org.opentest4j.AssertionFailedError:
expected: 1L
but was: 0L
I tried putting in @KafkaListener(topics=[“kafka-test”], group-id=”test-group”) where StreamListener was: but it resulted in the same errors.
I rewrote the consumer where the StreamListener was from:
fun consume(message: Message<*>) {
to
fun consume(): Consumer<Message<*>> = Consumer { message ->
but that resulted in an error as well:
Too many arguments for public open fun consume(): Consumer<Message<*>>
The arguments passed were:
kafkaConsumer.consume(GenericMessage("""{"aV": 1, "dI": 3, "id": $alertId}""".toByteArray()))
Rewriting it to:
@KafkaListener(topics = ["test-topic"], groupId = "group-test")
fun consume(message: GenericMessage<ByteArray>) {`
Resulted in the same expected 1L but was 0L error.
Prior to the migration, with just the EnableBinding and StreamListener removed but all the same old 2.X dependencies, the same error occurs as well. I was wondering if there is something that would function the same way as those two for the purposes of the kafka consumer?
Current migrated dependencies with kafka are Spring-kafka 3.1.4, spring-cloud-stream 4.1.1, kafka-clients 6.2.1, spring-integration 6.1.6.
user25089203 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.