I am using spanner with R2DBC in my application. On making concurrent calls to application, for saving and fetch the entity from database, I am getting following error :
2024-09-18 16:16:04.592 | [parallel-4] | ERROR | traceId: f92b7be3186fff9d91a864236da7cafd | o.s.t.i.TransactionInterceptor - Application exception overridden by rollback exception
com.google.cloud.spanner.SpannerException: FAILED_PRECONDITION: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: This transaction has been invalidated by a later transaction in the same session.
at com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:291)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ SQL "INSERT INTO PAYMENT_IDENTIFIER_ENTITY (ID, IDENTIFIER, PAYMENT_FK) VALUES (@val0ID, @val1IDENTIFIER, @val2PAYMENT_FK)" [DatabaseClient]
Original Stack Trace:
at com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionPreformatted(SpannerExceptionFactory.java:291)
at com.google.cloud.spanner.SpannerExceptionFactory.fromApiException(SpannerExceptionFactory.java:311)
at com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:174)
at com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException(SpannerExceptionFactory.java:110)
at com.google.cloud.spanner.SpannerExceptionFactory.asSpannerException(SpannerExceptionFactory.java:100)
at com.google.cloud.spanner.TransactionRunnerImpl$TransactionContextImpl.lambda$executeUpdateAsync$2(TransactionRunnerImpl.java:784)
at com.google.api.core.ApiFutures$ApiFunctionToGuavaFunction.apply(ApiFutures.java:397)
at com.google.common.util.concurrent.AbstractCatchingFuture$CatchingFuture.doFallback(AbstractCatchingFuture.java:234)
at com.google.common.util.concurrent.AbstractCatchingFuture$CatchingFuture.doFallback(AbstractCatchingFuture.java:222)
at com.google.common.util.concurrent.AbstractCatchingFuture.run(AbstractCatchingFuture.java:133)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
at com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:104)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
at com.google.api.gax.retrying.BasicRetryingFuture.handleAttempt(BasicRetryingFuture.java:200)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.handle(CallbackChainRetryingFuture.java:135)
at com.google.api.gax.retrying.CallbackChainRetryingFuture$AttemptCompletionListener.run(CallbackChainRetryingFuture.java:117)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:92)
at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:74)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:84)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1132)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:808)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:568)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor$1$1.onClose(SpannerErrorInterceptor.java:100)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.grpc.StatusRuntimeException: FAILED_PRECONDITION: This transaction has been invalidated by a later transaction in the same session.
at io.grpc.Status.asRuntimeException(Status.java:533)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:538)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.api.gax.grpc.ChannelPool$ReleasingClientCall$1.onClose(ChannelPool.java:570)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor$1$1.onClose(SpannerErrorInterceptor.java:100)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:564)
at io.grpc.internal.ClientCallImpl.access$100(ClientCallImpl.java:72)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:729)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:710)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Initialization of connectionFactory :
@Bean
public ConnectionFactory connectionFactories() {
ConnectionFactoryOptions connectionFactoryOptions = ConnectionFactoryOptions.builder()
.option(DRIVER, "cloudspanner")
.option(PROJECT, "xxx")
.option(INSTANCE, "xxx")
.option(DATABASE, "xxx")
.option(CREDENTIALS, "xxx")
.build();
return ConnectionFactories.get(connectionFactoryOptions);
}
Code to save and fetch entity
@Transactional
public Mono<AuthorizationVO> save(BasePaymentRequest request) {
AuthorizationEntity authorizationEntity = mapToAuthorizationEntity(request);
return authorizationRepository.save(mapToAuthorizationEntity(request))
.doOnError(err -> {
log.error(String.format("PaymentId : %s . Error while saveAuthEntity : %s",
request.getPayment().getPaymentId(), err.getMessage()), err);
})
.flatMap(entity -> {
List<PaymentIdentifierEntity> paymentIdentifierEntityList = mapToPaymentIdentifierEntity(
request, entity.getId());
return paymentIdentifierRepository.saveAll(paymentIdentifierEntityList).collectList();
})
.doOnError(err -> {
log.error(String.format("PaymentId : %s . Error while savePaymentIdentifierEntity : %s",
request.getPayment().getPaymentId(), err.getMessage()), err);
})
.map(piEntityList -> {
List<PaymentIdentifierVO> piVOList = mapToPaymentIdentifierVO(piEntityList);
return mapToAuthorizationVO(authorizationEntity, piVOList);
})
.doOnNext(authorizationVO -> log.info(
String.format("PaymentId : %s . Saved VO : %s", request.getPayment().getPaymentId(),
authorizationVO)));
}
@Transactional
public Mono<AuthorizationVO> fetchByPaymentId(String paymentId) {
AuthorizationVO authorizationVO = AuthorizationVO.builder().build();
return authorizationRepository.findByPaymentId(paymentId)
.switchIfEmpty(Mono.defer(
() -> Mono.error(new RuntimeException(String.format("PaymentId : %s . Not Found.")))))
.doOnError(err -> {
log.error(String.format("PaymentId : %s . Error while fetchAuthEntity : %s",
paymentId, err.getMessage()), err);
})
.flatMap(authEntity -> {
authorizationVO.setPaymentId(authEntity.getPaymentId());
return paymentIdentifierRepository.findByPaymentFk(authEntity.getId())
.collectList();
}).doOnError(err -> {
log.error(String.format("PaymentId : %s . Error while fetchPaymentIdentifierEntity : %s",
paymentId, err.getMessage()), err);
})
.map(piList -> {
authorizationVO.setPaymentIdentifierVOList(mapToPaymentIdentifierVO(piList));
return authorizationVO;
})
.doOnNext(authVO -> log.info(
String.format("PaymentId : %s . fetched VO : %s", paymentId, authVO)));
}
From internet, I came to know that it is because single session can handle only 1 transaction and somehow for my case multiple transactions are getting assigned to single session.
Can someone please help to understand and resolve the issue
Aditya Saha is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.