I had seen issue – StreamBridge not sending acknowledgment header and corresponding code as below for GH–2563.
if (ObjectUtils.containsElement(consumerBindingNames, destinationName)) { //GH-2563
logger.warn("You seem to be sending data to the input binding. It is not "
+ "recommended, since you are bypassing the binder and this the messaging system exposed by the binder.");
}
My use case is falling in same category but I am not sure what is correct way of doing it. I have below properties in applicaiton.properties
spring.cloud.function.definition=startTransformation;auditLog
spring.cloud.stream.function.bindings.startTransformation-in-0=transformer
spring.cloud.stream.bindings.transformer.group=transformer-group
spring.cloud.stream.bindings.transformer.consumer.concurrency=3
spring.cloud.stream.bindings.transformer.consumer.maxAttempts = 1
spring.cloud.stream.function.bindings.auditLog-in-0=audit
spring.cloud.stream.bindings.audit.group=audit-group
spring.cloud.stream.bindings.audit.consumer.concurrency=1
spring.cloud.stream.bindings.audit.consumer.maxAttempts=1
Here startTransformation is a consumer method which receives message and does some processing. In this processing it needs to do some auditing but I do not want to block the startTransfromation logic for auditing and hence I am sending message to audit queue in between the execution of startTransfromation
pseudo-code
public void startTransformation(Message etlMessage) {
String message = "Request received for transformation " + etlMessage;
streamBridge.send("audit",messageReceivedEvent);
//complex logic.
streamBridge.send("audit",messageCompletedEvent);
}
While sending message I am getting warning and I understand why because the destination and consumer binding name are same.
For resolution, I saw comment “You need an output binding with inbound-stream as its destination.”. I am not able to understand it.
The above scenario works if the consumers are at different JVM. I am able to send message without warnings to input bounding of a consumer running as a separate JVM application (i think as it does not have any information about the binding name of other jvm).