I have an endpoint that produces avro encoded binaries and return it via http streaming.
The service code is like this:
Mono<ServerResponse> fetchMyDataDataStream(ServerRequest req) {
var flux = requestExtractor.extract(req)
.flatMapMany(myDataFacade::fetchMyDataAvro)
.map(pos -> serialize(pos, MyData.class));
var bodyInserters = BodyInserters.fromDataBuffers(flux);
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(bodyInserters);
}
public DataBuffer serialize(MyData avro) {
var datumWriter = new SpecificDatumWriter<>(MyData.class);
datumWriter.write(avro, encoder);
encoder.flush();
var bytes = outputStream.toByteArray();
outputStream.reset();
return defaultDataBufferFactory.wrap(bytes);
}
I have a simply client that calls this endpoint, deserialize the avro message and print them:
public static void main(String[] args) throws InterruptedException {
var url = "http://localhost:8080";
var webClient = WebClient.create(url);
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux<byte[]> avroDataStream = webClient.get()
.uri("/v2/mydata-get")
.accept(MediaType.APPLICATION_OCTET_STREAM)
.retrieve()
.bodyToFlux(byte[].class);
avroDataStream.subscribe(AvroClientSample::processAvroInputStream, error -> {
error.printStackTrace();
countDownLatch.countDown();
}, countDownLatch::countDown);
countDownLatch.await();
}
private static void processAvroInputStream(final byte[] avroData) {
GenericDatumReader<MyData> datumReader = new SpecificDatumReader<>(MyData.class);
try {
var decoder = DecoderFactory.get().binaryDecoder(avroData, null);
var myData = datumReader.read(null, decoder);
System.out.println(myData);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
The service appears to work fine but the client, after successfully printed several messages, failed with:
{"id": 00014611, ...}
{"id": 00014612, ...}
{"id": 00014613, ...}
{"id": 00014614, ...}
{"id": 00014615, ...}
java.lang.RuntimeException: java.io.EOFException
I have a feeling that the http stream somehow terminated without finished transmitting all the data. I am not sure if this is the case. If so, how to fix it? If not, what could possibly be the reason of this?
1