I’m currently using Reactive Redis pub/sub in the spring, I’d like to unsubscribe
Do you know how to cancel my subscription if I experience certain situations like socket disconnection when I use ReactiveRedistemplate.listenChannel()
or ReactiveReddisMessageListerContainer
?
In the redis pub/sub environment(not reactive redis), I understand that you can cancel the subscription by providing the removeListener of the RedisMessageListenerContainer(RedisMessageListenerContainer.removeListener()
)
@Service
class RedisMessageSubscriber(
private val redisTemplate: ReactiveRedisTemplate<String, Any>,
private val reactiveRedisMessageListenerContainer: ReactiveRedisMessageListenerContainer
) {
private val log = logger()
fun subscribe(channel: String, ctx: ChannelHandlerContext) {
val subscribe = redisTemplate.listenToChannel(channel)
.map {
MapperUtils.readJsonValueOrThrow(it.message.toString(), ChatDto::class)
}
.subscribe { it ->
ctx.channel().writeAndFlush(CustomResponse(ResponseType.RECEIVE_CHAT, it))
.addListener {
if (it.isSuccess) {
log.info("write channel topic: $channel")
} else {
ctx.channel().close()
log.warn("Failed to send the message to the client. roomId:${channel}, message:${it.cause()}, socketOpen: ${ctx.channel().isOpen}")
throw IllegalArgumentException("socket close")
}
}
}
log.info("isDispose: ${subscribe.isDisposed}")
}
}
I thought if the socket channel was closed, Redis’ subscription would also be cancelled, but it was still there.