提交 4a0adc2b 编写于 作者: S Stephane Maldini

Sync with reactor-netty and reactor-ipc

上级 33a7b91e
......@@ -17,21 +17,23 @@
package org.springframework.http.client.reactive;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import io.netty.handler.codec.http.cookie.Cookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpClient;
import reactor.io.netty.http.model.Cookie;
import reactor.io.netty.http.model.Method;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseCookie;
/**
* {@link ClientHttpRequest} implementation for the Reactor Net HTTP client
......@@ -47,7 +49,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
private final URI uri;
private final HttpClient<Buffer, Buffer> httpClient;
private final HttpClient httpClient;
private Flux<Buffer> body;
......@@ -97,23 +99,23 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<ClientHttpResponse> execute() {
return this.httpClient.request(new Method(httpMethod.toString()), uri.toString(),
return this.httpClient.request(new io.netty.handler.codec.http.HttpMethod(httpMethod.toString()), uri.toString(),
channel -> {
// see https://github.com/reactor/reactor-io/pull/8
if (body == null) {
channel.headers().removeTransferEncodingChunked();
channel.removeTransferEncodingChunked();
}
return applyBeforeCommit()
.after(() -> {
getHeaders().entrySet().stream().forEach(e ->
channel.headers().set(e.getKey(), e.getValue()));
getCookies().values().stream().flatMap(Collection::stream).forEach(cookie ->
channel.addCookie(cookie.getName(), new ReactorCookie(cookie)));
channel.addCookie(cookie.getName(), new NettyCookie(cookie)));
return Mono.empty();
})
.after(() -> {
if (body != null) {
return channel.writeBufferWith(body);
return channel.send(body);
}
else {
return channel.writeHeaders();
......@@ -123,16 +125,12 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
.map(httpChannel -> new ReactorClientHttpResponse(httpChannel, allocator));
}
/**
* At present Reactor does not provide a {@link Cookie} implementation.
*/
private final static class ReactorCookie extends Cookie {
private final static class NettyCookie implements Cookie {
private final HttpCookie httpCookie;
public ReactorCookie(HttpCookie httpCookie) {
public NettyCookie(HttpCookie httpCookie) {
this.httpCookie = httpCookie;
}
......@@ -145,6 +143,76 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
public String value() {
return this.httpCookie.getValue();
}
@Override
public boolean isHttpOnly() {
return true;
}
@Override
public long maxAge() {
return -1;
}
@Override
public String domain() {
return null;
}
@Override
public String path() {
return null;
}
@Override
public void setValue(String value) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public boolean wrap() {
return false;
}
@Override
public void setWrap(boolean wrap) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setDomain(String domain) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setPath(String path) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setMaxAge(long maxAge) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setSecure(boolean secure) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setHttpOnly(boolean httpOnly) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public int compareTo(Cookie o) {
return httpCookie.getName().compareTo(o.name());
}
@Override
public boolean isSecure() {
return false;
}
}
}
......
......@@ -16,11 +16,9 @@
package org.springframework.http.client.reactive;
import java.nio.ByteBuffer;
import java.util.Collection;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpChannel;
import org.springframework.core.io.buffer.DataBuffer;
......@@ -42,7 +40,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
private final DataBufferAllocator allocator;
private final HttpChannel<Buffer, ByteBuffer> channel;
private final HttpChannel channel;
public ReactorClientHttpResponse(HttpChannel channel, DataBufferAllocator allocator) {
......@@ -52,7 +50,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public Flux<DataBuffer> getBody() {
return channel.input().map(b -> allocator.wrap(b.byteBuffer()));
return channel.receive().map(b -> allocator.wrap(b.byteBuffer()));
}
@Override
......@@ -64,7 +62,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public HttpStatus getStatusCode() {
return HttpStatus.valueOf(this.channel.responseStatus().getCode());
return HttpStatus.valueOf(this.channel.responseStatus().code());
}
@Override
......@@ -76,8 +74,8 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
.domain(cookie.domain())
.path(cookie.path())
.maxAge(cookie.maxAge())
.secure(cookie.secure())
.httpOnly(cookie.httpOnly())
.secure(cookie.isSecure())
.httpOnly(cookie.isHttpOnly())
.build();
result.add(cookie.name(), responseCookie);
});
......@@ -87,7 +85,7 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public String toString() {
return "ReactorClientHttpResponse{" +
"request=" + this.channel.method().getName() + " " + this.channel.uri() + "," +
"request=" + this.channel.method().name() + " " + this.channel.uri() + "," +
"status=" + getStatusCode() +
'}';
}
......
......@@ -90,7 +90,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public Mono<Void> setBody(Publisher<DataBuffer> publisher) {
return new WriteWithOperator<>(publisher, writePublisher ->
return new ChannelSendOperator<>(publisher, writePublisher ->
applyBeforeCommit().after(() -> setBodyInternal(writePublisher)));
}
......
......@@ -37,12 +37,12 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class WriteWithOperator<T> extends MonoSource<T, Void> {
public class ChannelSendOperator<T> extends MonoSource<T, Void> {
private final Function<Publisher<T>, Publisher<Void>> writeFunction;
public WriteWithOperator(Publisher<? extends T> source,
public ChannelSendOperator(Publisher<? extends T> source,
Function<Publisher<T>, Publisher<Void>> writeFunction) {
super(source);
this.writeFunction = writeFunction;
......
......@@ -28,7 +28,7 @@ import org.springframework.util.Assert;
* @author Stephane Maldini
*/
public class ReactorHttpHandlerAdapter
implements ChannelFluxHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> {
implements ChannelFluxHandler<Buffer, Buffer, HttpChannel> {
private final HttpHandler httpHandler;
......@@ -42,7 +42,7 @@ public class ReactorHttpHandlerAdapter
}
@Override
public Mono<Void> apply(HttpChannel<Buffer, Buffer> channel) {
public Mono<Void> apply(HttpChannel channel) {
ReactorServerHttpRequest adaptedRequest =
new ReactorServerHttpRequest(channel, allocator);
ReactorServerHttpResponse adaptedResponse =
......
......@@ -19,10 +19,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import io.netty.handler.codec.http.cookie.Cookie;
import reactor.core.publisher.Flux;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpChannel;
import reactor.io.netty.http.model.Cookie;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
......@@ -39,11 +38,11 @@ import org.springframework.util.MultiValueMap;
*/
public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
private final HttpChannel<Buffer, ?> channel;
private final HttpChannel channel;
private final DataBufferAllocator allocator;
public ReactorServerHttpRequest(HttpChannel<Buffer, ?> request,
public ReactorServerHttpRequest(HttpChannel request,
DataBufferAllocator allocator) {
Assert.notNull("'request' must not be null");
Assert.notNull(allocator, "'allocator' must not be null");
......@@ -52,13 +51,13 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
}
public HttpChannel<Buffer, ?> getReactorChannel() {
public HttpChannel getReactorChannel() {
return this.channel;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.channel.method().getName());
return HttpMethod.valueOf(this.channel.method().name());
}
@Override
......@@ -75,17 +74,17 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override
protected void initCookies(MultiValueMap<String, HttpCookie> cookies) {
for (String name : this.channel.cookies().keySet()) {
for (CharSequence name : this.channel.cookies().keySet()) {
for (Cookie cookie : this.channel.cookies().get(name)) {
HttpCookie httpCookie = new HttpCookie(name, cookie.value());
cookies.add(name, httpCookie);
HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value());
cookies.add(name.toString(), httpCookie);
}
}
}
@Override
public Flux<DataBuffer> getBody() {
return Flux.from(this.channel.input()).map(bytes -> {
return Flux.from(this.channel.receive()).map(bytes -> {
ByteBuffer byteBuffer = bytes.byteBuffer();
return allocator.wrap(byteBuffer);
});
......
......@@ -19,13 +19,13 @@ package org.springframework.http.server.reactive;
import java.time.Duration;
import java.util.Optional;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.io.buffer.Buffer;
import reactor.io.netty.http.HttpChannel;
import reactor.io.netty.http.model.Cookie;
import reactor.io.netty.http.model.Status;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
......@@ -41,9 +41,9 @@ import org.springframework.util.Assert;
*/
public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
private final HttpChannel<?, Buffer> channel;
private final HttpChannel channel;
public ReactorServerHttpResponse(HttpChannel<?, Buffer> response,
public ReactorServerHttpResponse(HttpChannel response,
DataBufferAllocator allocator) {
super(allocator);
Assert.notNull("'response' must not be null.");
......@@ -51,18 +51,18 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
public HttpChannel<?, Buffer> getReactorChannel() {
public HttpChannel getReactorChannel() {
return this.channel;
}
@Override
public void setStatusCode(HttpStatus status) {
getReactorChannel().responseStatus(Status.valueOf(status.value()));
getReactorChannel().responseStatus(HttpResponseStatus.valueOf(status.value()));
}
@Override
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
return Mono.from(this.channel.writeWith(
return Mono.from(this.channel.send(
Flux.from(publisher).map(buffer -> new Buffer(buffer.asByteBuffer()))));
}
......@@ -79,22 +79,18 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
protected void writeCookies() {
for (String name : getCookies().keySet()) {
for (ResponseCookie httpCookie : getCookies().get(name)) {
Cookie cookie = new ReactorCookie(httpCookie);
Cookie cookie = new NettyCookie(httpCookie);
this.channel.addResponseCookie(name, cookie);
}
}
}
/**
* At present Reactor does not provide a {@link Cookie} implementation.
*/
private final static class ReactorCookie extends Cookie {
private final static class NettyCookie implements Cookie {
private final ResponseCookie httpCookie;
public ReactorCookie(ResponseCookie httpCookie) {
public NettyCookie(ResponseCookie httpCookie) {
this.httpCookie = httpCookie;
}
......@@ -109,7 +105,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
public boolean httpOnly() {
public boolean isHttpOnly() {
return this.httpCookie.isHttpOnly();
}
......@@ -132,7 +128,52 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse {
}
@Override
public boolean secure() {
public void setValue(String value) {
}
@Override
public boolean wrap() {
return false;
}
@Override
public void setWrap(boolean wrap) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setDomain(String domain) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setPath(String path) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setMaxAge(long maxAge) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setSecure(boolean secure) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public void setHttpOnly(boolean httpOnly) {
throw new UnsupportedOperationException("Read-Only Cookie");
}
@Override
public int compareTo(Cookie o) {
return httpCookie.getName().compareTo(o.name());
}
@Override
public boolean isSecure() {
return this.httpCookie.isSecure();
}
}
......
......@@ -33,7 +33,7 @@ public class ReactorHttpServer extends HttpServerSupport
private ReactorHttpHandlerAdapter reactorHandler;
private reactor.io.netty.http.HttpServer<Buffer, Buffer> reactorServer;
private reactor.io.netty.http.HttpServer reactorServer;
private DataBufferAllocator allocator = new DefaultDataBufferAllocator();
......
......@@ -39,7 +39,7 @@ import static org.junit.Assert.*;
* @author Stephane Maldini
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public class WriteWithOperatorTests {
public class ChannelSendOperatorTests {
private OneByOneAsyncWriter writer;
......@@ -49,14 +49,14 @@ public class WriteWithOperatorTests {
this.writer = new OneByOneAsyncWriter();
}
private <T> Mono<Void> writeWithOperator(Publisher<String> source){
return new WriteWithOperator<>(source, writer::writeWith);
private <T> Mono<Void> sendOperator(Publisher<String> source){
return new ChannelSendOperator<>(source, writer::send);
}
@Test
public void errorBeforeFirstItem() throws Exception {
IllegalStateException error = new IllegalStateException("boo");
Mono<Void> completion = Mono.<String>error(error).as(this::writeWithOperator);
Mono<Void> completion = Mono.<String>error(error).as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();
assertNotNull(signal);
......@@ -65,7 +65,7 @@ public class WriteWithOperatorTests {
@Test
public void completionBeforeFirstItem() throws Exception {
Mono<Void> completion = Flux.<String>empty().as(this::writeWithOperator);
Mono<Void> completion = Flux.<String>empty().as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();
assertNotNull(signal);
......@@ -77,7 +77,7 @@ public class WriteWithOperatorTests {
@Test
public void writeOneItem() throws Exception {
Mono<Void> completion = Flux.just("one").as(this::writeWithOperator);
Mono<Void> completion = Flux.just("one").as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();
assertNotNull(signal);
......@@ -92,7 +92,7 @@ public class WriteWithOperatorTests {
@Test
public void writeMultipleItems() throws Exception {
List<String> items = Arrays.asList("one", "two", "three");
Mono<Void> completion = Flux.fromIterable(items).as(this::writeWithOperator);
Mono<Void> completion = Flux.fromIterable(items).as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();
assertNotNull(signal);
......@@ -115,7 +115,7 @@ public class WriteWithOperatorTests {
subscriber.onError(error);
}
}, subscriber -> new AtomicInteger());
Mono<Void> completion = publisher.as(this::writeWithOperator);
Mono<Void> completion = publisher.as(this::sendOperator);
Signal<Void> signal = completion.materialize().get();
assertNotNull(signal);
......@@ -138,10 +138,9 @@ public class WriteWithOperatorTests {
private Throwable error;
public Publisher<Void> writeWith(Publisher<String> publisher) {
public Publisher<Void> send(Publisher<String> publisher) {
return subscriber -> {
Executors.newSingleThreadScheduledExecutor().schedule(
(Runnable) () -> publisher.subscribe(new WriteSubscriber(subscriber)),
Executors.newSingleThreadScheduledExecutor().schedule(() -> publisher.subscribe(new WriteSubscriber(subscriber)),
50, TimeUnit.MILLISECONDS);
};
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册