提交 807297f1 编写于 作者: R Rossen Stoyanchev

Remove RxNetty (from test sources)

Practically no changes to RxNetty for a year and efforts underway to
rebuild 0.6.x based on a current Reactor Netty base.

Aside from the extra time to run integration tests having two
Netty-based servers can also cause false alarms such as ByteBuf leaks
related to RxNetty.
上级 369d33c3
......@@ -78,9 +78,6 @@ dependencies {
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}")
testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}")
testCompile("io.reactivex:rxnetty-http:0.5.2") {
exclude group: 'io.reactivex', module: 'rxjava'
}
testCompile("com.squareup.okhttp3:mockwebserver:3.9.0")
testCompile("org.jetbrains.kotlin:kotlin-reflect:${kotlinVersion}")
testCompile("org.skyscreamer:jsonassert:1.5.0")
......
......@@ -28,7 +28,6 @@ import org.junit.runners.Parameterized;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer;
......@@ -48,7 +47,6 @@ public abstract class AbstractHttpHandlerIntegrationTests {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new JettyHttpServer()},
{new RxNettyHttpServer()},
{new ReactorHttpServer()},
{new TomcatHttpServer(base.getAbsolutePath())},
{new UndertowHttpServer()}
......
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
/**
* Adapt {@link HttpHandler} to the RxNetty {@link RequestHandler}.
* For internal use within the framework.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBuf> {
private static final Log logger = LogFactory.getLog(RxNettyHttpHandlerAdapter.class);
private final HttpHandler httpHandler;
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "HttpHandler must not be null");
this.httpHandler = httpHandler;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> nativeRequest,
HttpServerResponse<ByteBuf> nativeResponse) {
Channel channel = nativeResponse.unsafeNettyChannel();
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.alloc());
InetSocketAddress remoteAddress = (InetSocketAddress) channel.remoteAddress();
ServerHttpRequest request;
ServerHttpResponse response;
try {
request = new RxNettyServerHttpRequest(nativeRequest, bufferFactory, remoteAddress);
response = new RxNettyServerHttpResponse(nativeResponse, bufferFactory);
}
catch (URISyntaxException ex) {
logger.error("Could not complete request", ex);
nativeResponse.setStatus(HttpResponseStatus.BAD_REQUEST);
return Observable.empty();
}
if (HttpMethod.HEAD.equals(request.getMethod())) {
response = new HttpHeadResponseDecorator(response);
}
Publisher<Void> result = this.httpHandler.handle(request, response)
.onErrorResume(ex -> {
logger.error("Could not complete request", ex);
nativeResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return Mono.empty();
})
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));
return RxReactiveStreams.toObservable(result);
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.cookie.Cookie;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Adapt {@link ServerHttpRequest} to the RxNetty {@link HttpServerRequest}.
* For internal use within the framework.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 5.0
*/
class RxNettyServerHttpRequest extends AbstractServerHttpRequest {
private final HttpServerRequest<ByteBuf> request;
private final NettyDataBufferFactory dataBufferFactory;
private final InetSocketAddress remoteAddress;
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request,
NettyDataBufferFactory dataBufferFactory, InetSocketAddress remoteAddress)
throws URISyntaxException {
super(initUri(request, remoteAddress), "", initHeaders(request));
this.request = request;
Assert.notNull(dataBufferFactory, "NettyDataBufferFactory must not be null");
this.dataBufferFactory = dataBufferFactory;
this.remoteAddress = remoteAddress;
}
private static URI initUri(HttpServerRequest<ByteBuf> request, InetSocketAddress remoteAddress)
throws URISyntaxException {
Assert.notNull(request, "HttpServerRequest must not be null");
String requestUri = request.getUri();
return (remoteAddress != null ? createUrl(remoteAddress, requestUri) : URI.create(requestUri));
}
private static URI createUrl(InetSocketAddress address, String requestUri) throws URISyntaxException {
// TODO: determine scheme
URI baseUrl = new URI("http", null, address.getHostString(), address.getPort(), null, null, null);
return new URI(baseUrl.toString() + requestUri);
}
private static HttpHeaders initHeaders(HttpServerRequest<ByteBuf> request) {
HttpHeaders headers = new HttpHeaders();
for (String name : request.getHeaderNames()) {
headers.put(name, request.getAllHeaderValues(name));
}
return headers;
}
@Override
public String getMethodValue() {
return this.request.getHttpMethod().name();
}
@Override
protected MultiValueMap<String, HttpCookie> initCookies() {
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
for (String name : this.request.getCookies().keySet()) {
for (Cookie cookie : this.request.getCookies().get(name)) {
HttpCookie httpCookie = new HttpCookie(name, cookie.value());
cookies.add(name, httpCookie);
}
}
return cookies;
}
@Override
public InetSocketAddress getRemoteAddress() {
return this.remoteAddress;
}
@Override
public Flux<DataBuffer> getBody() {
Observable<DataBuffer> content = this.request.getContent().map(dataBufferFactory::wrap);
return Flux.from(RxReactiveStreams.toPublisher(content));
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeRequest() {
return (T) this.request;
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.ResponseContentWriter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.functions.Func1;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the RxNetty {@link HttpServerResponse}.
* For internal use within the framework.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @author Sebastien Deleuze
* @since 5.0
*/
class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
private static final ByteBuf FLUSH_SIGNAL = Unpooled.buffer(0, 0);
// 8 Kb flush threshold to avoid blocking RxNetty when the send buffer has reached the high watermark
private static final long FLUSH_THRESHOLD = 8192;
private final HttpServerResponse<ByteBuf> response;
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response, NettyDataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
Assert.notNull(response, "HttpServerResponse must not be null");
this.response = response;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getNativeResponse() {
return (T) this.response;
}
@Override
protected void applyStatusCode() {
HttpStatus statusCode = this.getStatusCode();
if (statusCode != null) {
this.response.setStatus(HttpResponseStatus.valueOf(statusCode.value()));
}
}
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
Observable<ByteBuf> content = RxReactiveStreams.toObservable(body)
.map(NettyDataBufferFactory::toByteBuf);
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content, new FlushSelector(FLUSH_THRESHOLD))))
.then();
}
@Override
protected Mono<Void> writeAndFlushWithInternal(
Publisher<? extends Publisher<? extends DataBuffer>> body) {
Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body).
flatMap(publisher -> Flux.from(publisher).
map(NettyDataBufferFactory::toByteBuf).
concatWith(Mono.just(FLUSH_SIGNAL)));
Observable<ByteBuf> content = RxReactiveStreams.toObservable(bodyWithFlushSignals);
ResponseContentWriter<ByteBuf> writer = this.response.write(content, bb -> bb == FLUSH_SIGNAL);
return Flux.from(RxReactiveStreams.toPublisher(writer)).then();
}
@Override
protected void applyHeaders() {
for (String name : getHeaders().keySet()) {
for (String value : getHeaders().get(name)) {
this.response.addHeader(name, value);
}
}
}
@Override
protected void applyCookies() {
for (String name : getCookies().keySet()) {
for (ResponseCookie httpCookie : getCookies().get(name)) {
Cookie cookie = new DefaultCookie(name, httpCookie.getValue());
if (!httpCookie.getMaxAge().isNegative()) {
cookie.setMaxAge(httpCookie.getMaxAge().getSeconds());
}
if (httpCookie.getDomain() != null) {
cookie.setDomain(httpCookie.getDomain());
}
if (httpCookie.getPath() != null) {
cookie.setPath(httpCookie.getPath());
}
cookie.setSecure(httpCookie.isSecure());
cookie.setHttpOnly(httpCookie.isHttpOnly());
this.response.addCookie(cookie);
}
}
}
private class FlushSelector implements Func1<ByteBuf, Boolean> {
private final long flushEvery;
private long count;
public FlushSelector(long flushEvery) {
this.flushEvery = flushEvery;
}
@Override
public Boolean call(ByteBuf byteBuf) {
this.count += byteBuf.readableBytes();
if (this.count >= this.flushEvery) {
this.count = 0;
return true;
}
return false;
}
}
/*
While the underlying implementation of {@link ZeroCopyHttpOutputMessage} seems to
work; it does bypass {@link #applyBeforeCommit} and more importantly it doesn't change
its {@linkplain #state()). Therefore it's commented out, for now.
We should revisit this code once
https://github.com/ReactiveX/RxNetty/issues/194 has been fixed.
@Override
public Mono<Void> writeWith(File file, long position, long count) {
Channel channel = this.response.unsafeNettyChannel();
HttpResponse httpResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
io.netty.handler.codec.http.HttpHeaders headers = httpResponse.headers();
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
String headerName = header.getKey();
for (String headerValue : header.getValue()) {
headers.add(headerName, headerValue);
}
}
Mono<Void> responseWrite = MonoChannelFuture.from(channel.write(httpResponse));
FileRegion fileRegion = new DefaultFileRegion(file, position, count);
Mono<Void> fileWrite = MonoChannelFuture.from(channel.writeAndFlush(fileRegion));
return Flux.concat(applyBeforeCommit(), responseWrite, fileWrite).then();
}
*/
}
\ No newline at end of file
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive.bootstrap;
import io.netty.buffer.ByteBuf;
import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter;
/**
* @author Rossen Stoyanchev
*/
public class RxNettyHttpServer extends AbstractHttpServer {
private RxNettyHttpHandlerAdapter rxNettyHandler;
private io.reactivex.netty.protocol.http.server.HttpServer<ByteBuf, ByteBuf> rxNettyServer;
@Override
protected void initServer() throws Exception {
this.rxNettyHandler = createHttpHandlerAdapter();
this.rxNettyServer = io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort());
}
private RxNettyHttpHandlerAdapter createHttpHandlerAdapter() {
return new RxNettyHttpHandlerAdapter(resolveHttpHandler());
}
@Override
protected void startInternal() {
this.rxNettyServer.start(this.rxNettyHandler);
setPort(this.rxNettyServer.getServerPort());
}
@Override
protected void stopInternal() {
this.rxNettyServer.shutdown();
}
@Override
protected void resetInternal() {
this.rxNettyServer = null;
this.rxNettyHandler = null;
}
}
......@@ -48,9 +48,6 @@ dependencies {
testCompile("org.hibernate:hibernate-validator:6.0.4.Final")
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
testCompile("io.projectreactor:reactor-test")
testCompile("io.reactivex:rxnetty-http:0.5.2") {
exclude group: 'io.reactivex', module: 'rxjava'
}
testCompile("org.apache.tomcat:tomcat-util:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}")
......
......@@ -32,12 +32,10 @@ import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTe
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.WebClient;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
/**
* @author Sebastien Deleuze
......@@ -85,9 +83,9 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
.verify(Duration.ofSeconds(5L));
}
catch (AssertionError err) {
if (err.getMessage().startsWith("VerifySubscriber timed out") &&
(this.server instanceof RxNettyHttpServer || this.server instanceof ReactorHttpServer)) {
// TODO: RxNetty usually times out here; Reactor does the same on Windows at least...
String os = System.getProperty("os.name").toLowerCase();
if (os.contains("windows") && err.getMessage().startsWith("VerifySubscriber timed out")) {
// TODO: Reactor usually times out on Windows ...
err.printStackTrace();
return;
}
......@@ -102,21 +100,10 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
.exchange()
.flatMapMany(response -> response.bodyToFlux(String.class));
try {
StepVerifier.create(result)
.expectNextMatches(s -> s.startsWith("0123456789"))
.thenCancel()
.verify(Duration.ofSeconds(5L));
}
catch (AssertionError err) {
if (err.getMessage().startsWith("VerifySubscriber timed out") &&
this.server instanceof RxNettyHttpServer) {
// TODO: RxNetty usually times out here
err.printStackTrace();
return;
}
throw err;
}
StepVerifier.create(result)
.expectNextMatches(s -> s.startsWith("0123456789"))
.thenCancel()
.verify(Duration.ofSeconds(5L));
}
......
......@@ -20,6 +20,8 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.tomcat.websocket.WsWebSocketContainer;
import org.apache.tomcat.websocket.server.WsContextListener;
......@@ -42,13 +44,11 @@ import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.http.server.reactive.bootstrap.UndertowHttpServer;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.TomcatWebSocketClient;
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
......@@ -58,12 +58,11 @@ import org.springframework.web.reactive.socket.server.support.HandshakeWebSocket
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.UndertowRequestUpgradeStrategy;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
import static org.junit.Assume.*;
import static org.junit.Assume.assumeFalse;
/**
* Base class for WebSocket integration tests. Sub-classes must implement
......@@ -94,32 +93,25 @@ public abstract class AbstractWebSocketIntegrationTests {
@Parameters(name = "client[{0}] - server [{1}]")
public static Object[][] arguments() throws IOException {
Flux<? extends WebSocketClient> clients = Flux.concat(
Flux.just(new TomcatWebSocketClient(new WsWebSocketContainer())).repeat(5),
Flux.just(new JettyWebSocketClient()).repeat(5),
Flux.just(new ReactorNettyWebSocketClient()).repeat(5),
Flux.just(new RxNettyWebSocketClient()).repeat(5),
Flux.just(new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))).repeat(5));
Flux<? extends HttpServer> servers = Flux.just(
new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class),
new JettyHttpServer(),
new ReactorHttpServer(),
new RxNettyHttpServer(),
new UndertowHttpServer()).repeat(5);
Flux<? extends Class<?>> configs = Flux.just(
TomcatConfig.class,
JettyConfig.class,
ReactorNettyConfig.class,
RxNettyConfig.class,
UndertowConfig.class).repeat(5);
return Flux.zip(clients, servers, configs)
.map(Tuple3::toArray)
.collectList()
.block()
.toArray(new Object[25][2]);
WebSocketClient[] clients = new WebSocketClient[] {
new TomcatWebSocketClient(new WsWebSocketContainer()),
new JettyWebSocketClient(),
new ReactorNettyWebSocketClient(),
new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))
};
Map<HttpServer, Class<?>> servers = new LinkedHashMap<>();
servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class);
servers.put(new JettyHttpServer(), JettyConfig.class);
servers.put(new ReactorHttpServer(), ReactorNettyConfig.class);
servers.put(new UndertowHttpServer(), UndertowConfig.class);
Flux<WebSocketClient> f1 = Flux.fromArray(clients).concatMap(c -> Flux.just(c).repeat(servers.size()));
Flux<HttpServer> f2 = Flux.fromIterable(servers.keySet()).repeat(clients.length);
Flux<Class<?>> f3 = Flux.fromIterable(servers.values()).repeat(clients.length);
return Flux.zip(f1, f2, f3).map(Tuple3::toArray).collectList().block()
.toArray(new Object[clients.length * servers.size()][2]);
}
......@@ -133,7 +125,7 @@ public abstract class AbstractWebSocketIntegrationTests {
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400)
// at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
assumeFalse(this.client instanceof UndertowWebSocketClient && this.server instanceof RxNettyHttpServer);
assumeFalse(this.client instanceof UndertowWebSocketClient);
this.server.setHandler(createHttpHandler());
this.server.afterPropertiesSet();
......@@ -207,16 +199,6 @@ public abstract class AbstractWebSocketIntegrationTests {
}
@Configuration
static class RxNettyConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new RxNettyRequestUpgradeStrategy();
}
}
@Configuration
static class TomcatConfig extends AbstractHandlerAdapterConfig {
......
......@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
......@@ -168,7 +167,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
public Mono<Void> handle(WebSocketSession session) {
String protocol = session.getHandshakeInfo().getSubProtocol();
WebSocketMessage message = session.textMessage(protocol);
return doSend(session, Mono.just(message));
return session.send(Mono.just(message));
}
}
......@@ -180,16 +179,8 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
HttpHeaders headers = session.getHandshakeInfo().getHeaders();
String payload = "my-header:" + headers.getFirst("my-header");
WebSocketMessage message = session.textMessage(payload);
return doSend(session, Mono.just(message));
return session.send(Mono.just(message));
}
}
// TODO: workaround for suspected RxNetty WebSocket client issue
// https://github.com/ReactiveX/RxNetty/issues/560
private static Mono<Void> doSend(WebSocketSession session, Publisher<WebSocketMessage> output) {
return session.send(Mono.delay(Duration.ofMillis(100)).thenMany(output));
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} implementation that adapts to the RxNetty
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
* For internal use within the framework.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
/**
* The {@code ChannelHandler} name to use when inserting a
* {@link WebSocketFrameAggregator} in the channel pipeline.
*/
public static final String FRAME_AGGREGATOR_NAME = "websocket-frame-aggregator";
public RxNettyWebSocketSession(WebSocketConnection conn, HandshakeInfo info, NettyDataBufferFactory factory) {
super(conn, info, factory);
}
/**
* Insert an {@link WebSocketFrameAggregator} after the
* {@code WebSocketFrameDecoder} for receiving full messages.
* @param channel the channel for the session
* @param frameDecoderName the name of the WebSocketFrame decoder
*/
public RxNettyWebSocketSession aggregateFrames(Channel channel, String frameDecoderName) {
ChannelPipeline pipeline = channel.pipeline();
if (pipeline.context(FRAME_AGGREGATOR_NAME) != null) {
return this;
}
ChannelHandlerContext frameDecoder = pipeline.context(frameDecoderName);
if (frameDecoder == null) {
throw new IllegalArgumentException("WebSocketFrameDecoder not found: " + frameDecoderName);
}
ChannelHandler frameAggregator = new WebSocketFrameAggregator(DEFAULT_FRAME_MAX_SIZE);
pipeline.addAfter(frameDecoder.name(), FRAME_AGGREGATOR_NAME, frameAggregator);
return this;
}
@Override
public Flux<WebSocketMessage> receive() {
Observable<WebSocketMessage> messages = getDelegate()
.getInput()
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
.map(super::toMessage);
return Flux.from(RxReactiveStreams.toPublisher(messages));
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Observable<WebSocketFrame> frames = RxReactiveStreams.toObservable(messages).map(this::toFrame);
Observable<Void> completion = getDelegate().writeAndFlushOnEach(frames);
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
@Override
public Mono<Void> close(CloseStatus status) {
Observable<Void> completion = getDelegate().close();
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.reactivex.netty.protocol.http.HttpHandlerNames;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import io.reactivex.netty.protocol.http.ws.client.WebSocketRequest;
import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
import io.reactivex.netty.threads.RxEventLoopProvider;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
/**
* {@link WebSocketClient} implementation for use with RxNetty.
* For internal use within the framework.
*
* <p><strong>Note: </strong> RxNetty {@link HttpClient} instances require a host
* and port in order to be created. Hence it is not possible to configure a
* single {@code HttpClient} instance to use upfront. Instead the constructors
* accept a function for obtaining client instances when establishing a
* connection to a specific URI. By default new instances are created per
* connection with a shared Netty {@code EventLoopGroup}. See constructors for
* more details.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
private final Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientProvider;
/**
* Default constructor that creates {@code HttpClient} instances via
* {@link HttpClient#newClient(String, int)} using port 80 or 443 depending
* on the target URL scheme.
*
* <p><strong>Note: </strong> By default a new {@link HttpClient} instance
* is created per WebSocket connection. Those instances will share a global
* {@code EventLoopGroup} that RxNetty obtains via
* {@link RxEventLoopProvider#globalClientEventLoop(boolean)}.
*/
public RxNettyWebSocketClient() {
this(RxNettyWebSocketClient::getDefaultHttpClientProvider);
}
/**
* Constructor with a function to use to obtain {@link HttpClient} instances.
*/
public RxNettyWebSocketClient(Function<URI, HttpClient<ByteBuf, ByteBuf>> httpClientProvider) {
this.httpClientProvider = httpClientProvider;
}
private static HttpClient<ByteBuf, ByteBuf> getDefaultHttpClientProvider(URI url) {
boolean secure = "wss".equals(url.getScheme());
int port = (url.getPort() > 0 ? url.getPort() : secure ? 443 : 80);
return HttpClient.newClient(url.getHost(), port);
}
/**
* Return the configured {@link HttpClient} provider depending on which
* constructor was used.
*/
public Function<URI, HttpClient<ByteBuf, ByteBuf>> getHttpClientProvider() {
return this.httpClientProvider;
}
/**
* Return an {@link HttpClient} instance to use to connect to the given URI.
* The default implementation invokes the {@link #getHttpClientProvider()}
* provider} function created or supplied at construction time.
* @param url the full URL of the WebSocket endpoint.
*/
public HttpClient<ByteBuf, ByteBuf> getHttpClient(URI url) {
return this.httpClientProvider.apply(url);
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
}
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
Observable<Void> completion = executeInternal(url, headers, handler);
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
@SuppressWarnings("cast")
private Observable<Void> executeInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
List<String> protocols = beforeHandshake(url, headers, handler);
return createRequest(url, headers, protocols)
.flatMap(response -> {
Observable<WebSocketConnection> conn = response.getWebSocketConnection();
// following cast is necessary to enable compilation on Eclipse 4.6
return (Observable<Tuple2<WebSocketResponse<ByteBuf>, WebSocketConnection>>)
Observable.zip(Observable.just(response), conn, Tuples::of);
})
.flatMap(tuple -> {
WebSocketResponse<ByteBuf> response = tuple.getT1();
WebSocketConnection conn = tuple.getT2();
HandshakeInfo info = afterHandshake(url, toHttpHeaders(response));
ByteBufAllocator allocator = response.unsafeNettyChannel().alloc();
NettyDataBufferFactory factory = new NettyDataBufferFactory(allocator);
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, info, factory);
session.aggregateFrames(response.unsafeNettyChannel(), HttpHandlerNames.WsClientDecoder.getName());
return RxReactiveStreams.toObservable(handler.handle(session));
});
}
private WebSocketRequest<ByteBuf> createRequest(URI url, HttpHeaders headers, List<String> protocols) {
String query = url.getRawQuery();
String requestUrl = url.getRawPath() + (query != null ? "?" + query : "");
HttpClientRequest<ByteBuf, ByteBuf> request = getHttpClient(url).createGet(requestUrl);
if (!headers.isEmpty()) {
Map<String, List<Object>> map = new HashMap<>(headers.size());
headers.forEach((key, values) -> map.put(key, new ArrayList<>(headers.get(key))));
request = request.setHeaders(map);
}
return (ObjectUtils.isEmpty(protocols) ? request.requestWebSocketUpgrade() :
request.requestWebSocketUpgrade().requestSubProtocols(StringUtils.toStringArray(protocols)));
}
private HttpHeaders toHttpHeaders(WebSocketResponse<ByteBuf> response) {
HttpHeaders headers = new HttpHeaders();
response.headerIterator().forEachRemaining(entry -> {
String name = entry.getKey().toString();
headers.put(name, response.getAllHeaderValues(name));
});
return headers;
}
}
/*
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.security.Principal;
import io.reactivex.netty.protocol.http.HttpHandlerNames;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.ws.server.WebSocketHandshaker;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.server.reactive.AbstractServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketSession;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
/**
* A {@link RequestUpgradeStrategy} for use with RxNetty.
* For internal use within the framework.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler handler, @Nullable String subProtocol) {
ServerHttpResponse response = exchange.getResponse();
HttpServerResponse<?> rxNettyResponse = ((AbstractServerHttpResponse) response).getNativeResponse();
HandshakeInfo info = getHandshakeInfo(exchange, subProtocol);
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
WebSocketHandshaker handshaker = rxNettyResponse
.acceptWebSocketUpgrade(conn -> {
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, info, factory);
String name = HttpHandlerNames.WsServerDecoder.getName();
session.aggregateFrames(rxNettyResponse.unsafeNettyChannel(), name);
return RxReactiveStreams.toObservable(handler.handle(session));
});
if (subProtocol != null) {
handshaker = handshaker.subprotocol(subProtocol);
}
else {
// TODO: https://github.com/reactor/reactor-netty/issues/20
handshaker = handshaker.subprotocol();
}
return Mono.from(RxReactiveStreams.toPublisher(handshaker));
}
private HandshakeInfo getHandshakeInfo(ServerWebExchange exchange, String protocol) {
ServerHttpRequest request = exchange.getRequest();
Mono<Principal> principal = exchange.getPrincipal();
return new HandshakeInfo(request.getURI(), request.getHeaders(), principal, protocol);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册