I’m trying to test a Spring Kafka listener in a Spring Boot test using @EmbeddedKafka
. However, I keep encountering the following exception:
<code>No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
</code>
<code>No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
</code>
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
My Setup:
- Spring Boot Version: 3.2.4
Listener:
<code>@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
private final String retryTopic;
public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
@Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
this.kafkaTemplate = kafkaTemplate;
this.retryTopic = retryTopic;
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
batch = "true",
groupId = "group1", concurrency = "2")
public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
try {
CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
if (cancelAuthorizationLinkageWriterResource != null) {
cancelAuthorizationLinkageService.linkageAuthorization(
cancelAuthorizationLinkageWriterResource.getApiResource());
}
} catch (Exception e) {
log.error("listener error: {}", e.getMessage());
kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
cancelAuthorizationLinkageResource);
}
}
}
</code>
<code>@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
private final String retryTopic;
public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
@Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
this.kafkaTemplate = kafkaTemplate;
this.retryTopic = retryTopic;
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
batch = "true",
groupId = "group1", concurrency = "2")
public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
try {
CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
if (cancelAuthorizationLinkageWriterResource != null) {
cancelAuthorizationLinkageService.linkageAuthorization(
cancelAuthorizationLinkageWriterResource.getApiResource());
}
} catch (Exception e) {
log.error("listener error: {}", e.getMessage());
kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
cancelAuthorizationLinkageResource);
}
}
}
</code>
@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
private final String retryTopic;
public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
@Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
this.kafkaTemplate = kafkaTemplate;
this.retryTopic = retryTopic;
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
batch = "true",
groupId = "group1", concurrency = "2")
public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
try {
CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
if (cancelAuthorizationLinkageWriterResource != null) {
cancelAuthorizationLinkageService.linkageAuthorization(
cancelAuthorizationLinkageWriterResource.getApiResource());
}
} catch (Exception e) {
log.error("listener error: {}", e.getMessage());
kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
cancelAuthorizationLinkageResource);
}
}
}
Test:
<code>@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
"bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
count = 3)
class CancelAuthorizationLinkageListenerTest {
@Autowired
private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;
@Mock
private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;
@Value("${spring.kafka.consumer.linkage-topic}")
private String linkageTopic;
@Value("${spring.kafka.producer.retry-topic}")
private String retryTopic;
private Consumer<String, CancelAuthorizationLinkageResource> consumer;
@BeforeEach
public void setUp() {
consumer = consumerFactory.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
}
@Test
@DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
void of_ok_1() throws Exception {
// init
int ngNumber = 1;
AtomicInteger atomicInteger = new AtomicInteger(0);
// mock
doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());
// verify
cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));
await()
.atMost(2, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
KafkaTestUtils.getRecords(consumer).records(retryTopic)
.forEach(x -> atomicInteger.incrementAndGet());
assertEquals(ngNumber, atomicInteger.get());
});
}
</code>
<code>@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
"bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
count = 3)
class CancelAuthorizationLinkageListenerTest {
@Autowired
private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;
@Mock
private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;
@Value("${spring.kafka.consumer.linkage-topic}")
private String linkageTopic;
@Value("${spring.kafka.producer.retry-topic}")
private String retryTopic;
private Consumer<String, CancelAuthorizationLinkageResource> consumer;
@BeforeEach
public void setUp() {
consumer = consumerFactory.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
}
@Test
@DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
void of_ok_1() throws Exception {
// init
int ngNumber = 1;
AtomicInteger atomicInteger = new AtomicInteger(0);
// mock
doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());
// verify
cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));
await()
.atMost(2, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
KafkaTestUtils.getRecords(consumer).records(retryTopic)
.forEach(x -> atomicInteger.incrementAndGet());
assertEquals(ngNumber, atomicInteger.get());
});
}
</code>
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
"bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
count = 3)
class CancelAuthorizationLinkageListenerTest {
@Autowired
private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;
@Mock
private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;
@Value("${spring.kafka.consumer.linkage-topic}")
private String linkageTopic;
@Value("${spring.kafka.producer.retry-topic}")
private String retryTopic;
private Consumer<String, CancelAuthorizationLinkageResource> consumer;
@BeforeEach
public void setUp() {
consumer = consumerFactory.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
}
@Test
@DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
void of_ok_1() throws Exception {
// init
int ngNumber = 1;
AtomicInteger atomicInteger = new AtomicInteger(0);
// mock
doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());
// verify
cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));
await()
.atMost(2, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
KafkaTestUtils.getRecords(consumer).records(retryTopic)
.forEach(x -> atomicInteger.incrementAndGet());
assertEquals(ngNumber, atomicInteger.get());
});
}
application.yml:
<code>spring:
profiles:
active: "local"
application:
name:
batch:
initialize-schema: ALWAYS
job:
names:
#enable: false
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx-${random.uuid}
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retry-topic: ppcd.cushion.cancel.auth.retry
# retries: 5
consumer:
group-id: groupid-Dev
auto-offset-reset: earliest
max-poll-records: 20
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
spring.json.trusted.packages: '*'
isolation.level: read_committed
linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion
</code>
<code>spring:
profiles:
active: "local"
application:
name:
batch:
initialize-schema: ALWAYS
job:
names:
#enable: false
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx-${random.uuid}
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retry-topic: ppcd.cushion.cancel.auth.retry
# retries: 5
consumer:
group-id: groupid-Dev
auto-offset-reset: earliest
max-poll-records: 20
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
spring.json.trusted.packages: '*'
isolation.level: read_committed
linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion
</code>
spring:
profiles:
active: "local"
application:
name:
batch:
initialize-schema: ALWAYS
job:
names:
#enable: false
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx-${random.uuid}
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retry-topic: ppcd.cushion.cancel.auth.retry
# retries: 5
consumer:
group-id: groupid-Dev
auto-offset-reset: earliest
max-poll-records: 20
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
spring.json.trusted.packages: '*'
isolation.level: read_committed
linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion