I want to Re-consume messages again which is already consumed previously from Amazon Kinesis based on Timestamp
Using below dependency in my project implementation
‘org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2’
my application.yml file
spring:
cloud:
aws:
credentials:
sts:
web-identity-token-file: <Where i had given the token file path>
role-arn: <Where i had given the assume role arn>
role-session-name: RoleSessionName
region:
static: <where i had given my aws region>
dualstack-enabled: false
stream:
kinesis:
binder:
auto-create-stream: false
min-shard-count: 1
bindings:
input-in-0:
destination: test-test.tst.v1
content-type: text/json
Below is the java class which contain the bean for processing data from Kinesis
@Configuration
public class KinesisConsumerBinder{
@Bean
public Consumer<Message<String>> input(){
return message ->{
System.out.println("Data from Kinesis:"+message.getPayload());
//Process the message got from Kinesis
}
}
}
For Checkpointing and Locking I’m using PostgreSQL(Replaced default Dynamo Db with PostgreSQL as per the details given in below link
Can we use PostgreSQL instead of default dynamo db for checkpointing and locking in case of consuming data from Kinesis Using Binder approach)
Reading, checkpointing and locking works as expected.
But I have a requirement to re-consume messages again Amazon Kinesis based on Timestamp which is already consumed previously.
Could anyone please help me to resolve this.