提交 2ba5ded3 编写于 作者: R Rossen Stoyanchev

Polish Jetty reactive HttpClient connector

上级 2eae37dd
......@@ -21,7 +21,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.reactive.client.ContentChunk;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -33,12 +33,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
* Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}.
*
* Implemented with buffer copy instead of optimized buffer wrapping because the latter
* hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and
* requesting more data at different times (required for {@code Mono<DataBuffer>} for example).
* See https://github.com/eclipse/jetty.project/issues/2429 for more details.
* {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient.
*
* @author Sebastien Deleuze
* @since 5.1
......@@ -63,7 +58,9 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
* @param resourceFactory the {@link JettyResourceFactory} to use
* @param customizer the lambda used to customize the {@link HttpClient}
*/
public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
public JettyClientHttpConnector(
JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
HttpClient httpClient = new HttpClient();
httpClient.setExecutor(resourceFactory.getExecutor());
httpClient.setByteBufferPool(resourceFactory.getByteBufferPool());
......@@ -107,16 +104,27 @@ public class JettyClientHttpConnector implements ClientHttpConnector {
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
return requestCallback.apply(clientHttpRequest).then(Mono.from(
clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> {
Flux<DataBuffer> content = Flux.from(contentChunks).map(chunk -> {
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
chunk.callback.succeeded();
return buffer;
});
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
return Mono.just(new JettyClientHttpResponse(response, content));
})));
}
private DataBuffer toDataBuffer(ContentChunk chunk) {
// We must copy until this is resolved:
// https://github.com/eclipse/jetty.project/issues/2429
// Use copy instead of buffer wrapping because Callback#succeeded() is
// used not only to release the buffer but also to request more data
// which is a problem for codecs that buffer data.
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
buffer.write(chunk.buffer);
chunk.callback.succeeded();
return buffer;
}
}
......@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.function.Function;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.reactive.client.ContentChunk;
import org.eclipse.jetty.reactive.client.ReactiveRequest;
import org.eclipse.jetty.util.Callback;
......@@ -86,24 +85,26 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
Flux<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);
MediaType contentType = getHeaders().getContentType();
ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks,
(contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE));
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build();
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue();
Flux<ContentChunk> chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType);
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
return doCommit(this::completes);
}
private String getContentType() {
MediaType contentType = getHeaders().getContentType();
return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE;
}
private Mono<Void> completes() {
return Mono.empty();
}
......@@ -146,4 +147,5 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest {
}
return this.reactiveRequest;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册