In the following code, I am trying to create a message by calling processOrder()
via REST endpoint. Then, I want to pass the result of processOrder()
to processShipping()
and processPayment
.
However, Whenever I call the rest endpoint http://localhost:8080/processOrder
, just the processOrder()
is called. What’s wrong here?
<code>package com.example.kafkademo.functions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class MessageFunctions {
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
return orderId + " : " + System.currentTimeMillis();
};
}
@Bean
public Consumer<String> processShipping(){
return orderId -> {
System.out.println("processShipping: " + orderId);
System.out.println(orderId);
};
}
@Bean
public Consumer<String> processPayment(){
return orderId -> {
System.out.println("processPayment: " + orderId);
System.out.println(orderId);
};
}
}
</code>
<code>package com.example.kafkademo.functions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class MessageFunctions {
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
return orderId + " : " + System.currentTimeMillis();
};
}
@Bean
public Consumer<String> processShipping(){
return orderId -> {
System.out.println("processShipping: " + orderId);
System.out.println(orderId);
};
}
@Bean
public Consumer<String> processPayment(){
return orderId -> {
System.out.println("processPayment: " + orderId);
System.out.println(orderId);
};
}
}
</code>
package com.example.kafkademo.functions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class MessageFunctions {
@Bean
public Function<String, String> processOrder(){
return orderId -> {
System.out.println("processOrder: " + orderId);
System.out.println(orderId);
return orderId + " : " + System.currentTimeMillis();
};
}
@Bean
public Consumer<String> processShipping(){
return orderId -> {
System.out.println("processShipping: " + orderId);
System.out.println(orderId);
};
}
@Bean
public Consumer<String> processPayment(){
return orderId -> {
System.out.println("processPayment: " + orderId);
System.out.println(orderId);
};
}
}
Here is application.yml
:
<code>spring:
application:
name: kafka-demo
cloud:
function:
definition: processOrder;processPayment;processShipping
stream:
bindings:
processOrder-out-0:
destination: order_topic
processPayment-in-0:
destination: order_topic
processShipping-in-0:
destination: order_topic
kafka:
listener:
port: 9094
bootstrap-servers:
- localhost:9094
</code>
<code>spring:
application:
name: kafka-demo
cloud:
function:
definition: processOrder;processPayment;processShipping
stream:
bindings:
processOrder-out-0:
destination: order_topic
processPayment-in-0:
destination: order_topic
processShipping-in-0:
destination: order_topic
kafka:
listener:
port: 9094
bootstrap-servers:
- localhost:9094
</code>
spring:
application:
name: kafka-demo
cloud:
function:
definition: processOrder;processPayment;processShipping
stream:
bindings:
processOrder-out-0:
destination: order_topic
processPayment-in-0:
destination: order_topic
processShipping-in-0:
destination: order_topic
kafka:
listener:
port: 9094
bootstrap-servers:
- localhost:9094
Just in case, here are the dependencies:
<code>dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
</code>
<code>dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
</code>
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.cloud:spring-cloud-stream-test-binder'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
1