I’am writing a spring reactive implementation for a third-party SPI which is base on traditional servlet. and one method reqiures inputStream from request as return value:
/**
* Returns the request input stream
*
* @param buffered if the input stream should be buffered and support for multiple reads
* @return the request input stream
*/
InputStream getInputStream(boolean buffered);
This is my implements:
private InputStream inputStream;
private ServerHttpRequest request;
@Override
public InputStream getInputStream(boolean buffered) {
if (inputStream != null) {
return inputStream;
}
var inputStreamMono =
request
.getBody()
.collect(
InputStreamCollector::new,
(t, dataBuffer) -> t.collectInputStream(dataBuffer.asInputStream()))
.flatMap(inputStreamCollector -> Mono.just(inputStreamCollector.getInputStream()));
if (buffered) {
return inputStream = new BufferedInputStream(Objects.requireNonNull(inputStreamMono.block()));
}
return inputStreamMono.block();
}
public class InputStreamCollector {
private InputStream is;
public InputStream getInputStream() {
return this.is;
}
public void collectInputStream(InputStream is) {
if (this.is == null) this.is = is;
this.is = new SequenceInputStream(this.is, is);
}
}
But it receives bellowd exception
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.4.jar:3.6.4]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ LogoutWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ ServerRequestCacheWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ AuthenticationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/linkerp/manager" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87) ~[reactor-core-3.6.4.jar:3.6.4]
at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.6.4.jar:3.6.4]
at org.umbrella.ReactiveHttpRequest.getInputStream(ReactiveHttpRequest.java:85) ~[classes/:na]
Is there any outstanding I missed? Or is there a proper way to return inputStream from a reactive ServerHttpRequest? Many Thanks!!