I’m request an openai style streaming dialog interface using httpcomponents-client. But the client always interrupts unexpectedly and I can see the Aborted request
on the chat server side.
I have tried the following options:
- set Connection keep-alive in header.
- Set SoTimeout and ConnectionRequestTimeout to never timeout.
But it doesn’t work.
am I missing something in the CloseableHttpAsyncClient configuration?
Exception
failed: org.apache.hc.core5.http.ConnectionClosedException: Connection is closed
dependencies
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
implementation("io.projectreactor:reactor-core:3.6.7")
request code
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hc.client5.http.async.methods.*;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.reactor.IOReactorConfig;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class StreamChatTests {
static final List<String> STREAM_COMPLETION_EVENT = Arrays.asList("data: [DONE]", "data:[DONE]");
static final ObjectMapper objectMapper = new ObjectMapper();
static final ObjectMapper chatObjectMapper = new ObjectMapper()
.enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS)
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
static final String chatUrl = "http://0.0.0.0:8002/vllm-fastapi/llm/v1/chat/completions";
static final IOReactorConfig ioReactorConfig = IOReactorConfig.copy(IOReactorConfig.DEFAULT)
.setSoTimeout(3, TimeUnit.SECONDS)
.build();
static final RequestConfig requestConfig = RequestConfig.copy(RequestConfig.DEFAULT)
.setConnectionRequestTimeout(3, TimeUnit.MINUTES)
.build();
record ChatCompletionRequest(
List<Map<String, String>> messages, String model, int max_tokens,
boolean stream, Map<String, Boolean> stream_options, float temperature, float top_p
) { }
record ChatCompletionChunk(String id, List<Choice> choices, Map<String, Integer> usage) { }
record Choice(int index, Map<String, String> delta, Map<String, String> message) { }
static class ChatResponseConsumer extends AbstractCharResponseConsumer<ChatCompletionChunk> {
FluxSink<ChatCompletionChunk> sink;
public ChatResponseConsumer(FluxSink<ChatCompletionChunk> sink) {
this.sink = sink;
}
@Override
public void releaseResources() {
}
@Override
protected int capacityIncrement() {
return Integer.MAX_VALUE;
}
@Override
protected void data(CharBuffer src, boolean endOfStream) throws IOException {
String currentLine = src.toString();
if (currentLine.isEmpty() || endOfStream) {
sink.next(new ChatCompletionChunk(null, null, null));
return;
}
if (currentLine.contains("r") || currentLine.contains("n")) {
String[] split = currentLine.split("\r?\n");
for (String s : split) {
handleCurrentLine(s, sink);
}
}else {
handleCurrentLine(currentLine, sink);
}
}
@Override
protected void start(HttpResponse response, ContentType contentType) {
}
@Override
protected ChatCompletionChunk buildResult() {
return null;
}
@Override
public void failed(final Exception cause) {
System.out.println("nfailed: " + cause);
}
}
static void handleCurrentLine(String currentLine, FluxSink<ChatCompletionChunk> sink) throws IOException {
if (currentLine.startsWith("data:")) {
String dataValue = currentLine.substring(5);
if (dataValue.startsWith(" ")) {
dataValue = dataValue.substring(1);
}
if (STREAM_COMPLETION_EVENT.contains(currentLine)) {
sink.complete();
return;
}
ChatCompletionChunk chatCompletionChunk = chatObjectMapper.readValue(dataValue, ChatCompletionChunk.class);
sink.next(chatCompletionChunk);
}
}
public static void main(String[] args) throws Exception{
List<Map<String, String>> messages = new ArrayList<>();
messages.add(Map.of("role", "system", "content", "You are a helpful assistant."));
messages.add(Map.of("role", "user", "content", "Please help to write a proposal about the application of AI on the ground, which requires about 800 words."));
ChatCompletionRequest chatCompletionRequest = new ChatCompletionRequest(
messages, "Qwen2-7B-Instruct",
8192, true, Map.of("include_usage", true), 0.7F, 0.8F
);
String chatRequest = objectMapper.writeValueAsString(chatCompletionRequest);
try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create()
.setIOReactorConfig(ioReactorConfig)
.setDefaultRequestConfig(requestConfig)
.build()){
SimpleHttpRequest simpleHttpRequest = SimpleRequestBuilder.post(chatUrl)
.setBody(chatRequest, ContentType.APPLICATION_JSON)
.build();
client.start();
Flux<ChatCompletionChunk> flux = Flux.create(sink -> client.execute(
SimpleRequestProducer.create(simpleHttpRequest),
new ChatResponseConsumer(sink) , null
));
flux.subscribe(chatCompletionChunk -> {
Optional.ofNullable(chatCompletionChunk.choices).filter(choices -> !choices.isEmpty()).ifPresent(choices ->
System.out.print(choices.get(0).delta.getOrDefault("content", "")));
Optional.ofNullable(chatCompletionChunk.usage()).ifPresent(usage -> System.out.printf(
"nprompt_tokens: %s completion_tokens: %s total_tokens: %s",
usage.get("prompt_tokens"), usage.get("completion_tokens"), usage.get("total_tokens")
));
});
}
}
}