I thought the cost per call writeAndFlush was too high so I did it like this.
And I tried flushQueue in channelReadCompleted but addMessage probably called from another thread, and that message hang in the queue until channelReadCompleted called again and maybe that hang in the queue indefinitely.
Anyone have a better solution?
public class Sender {
private final Session session;
private final Queue<Message> messages = new ConcurrentLinkedQueue<>();
public ScheduledFuture<?> future;
public Sender(Session session) {
this.session = session;
future = session.getCtx().executor().scheduleWithFixedDelay(this::flushMessages, 50, 50, TimeUnit.MILLISECONDS);
}
public void addMessage(Message message) {
messages.add(message);
}
public void cancel() {
future.cancel(false);
}
public void flushMessages() {
Message message = messages.poll();
if (message != null) {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
do {
ByteBuf msgBuffer = message.getBuffer();
buffer.writeBytes(msgBuffer);
message.release();
message = messages.poll();
} while (message != null);
session.getCtx().writeAndFlush(buffer);
}
}
}
New contributor
ann is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.