From c682895deeaf2f03d9c6b60e1f4da0a469ba796a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 11 Aug 2015 16:09:05 -0400 Subject: [PATCH] Add RxNetty support This commit adds RxNetty integration that includes RxNetty-based implementations of ServerHttpRequest and ServerHttpResponse as well as an adapter from the RxNetty RequestHandler to the HttpHandler contracts. Only byte[] is supported at the moment for reading and writing with a corresponding copy to and from Netty ByteBuf. --- spring-web-reactive/build.gradle | 4 + .../web/rxnetty/RequestHandlerAdapter.java | 50 +++++++++++ .../web/rxnetty/RxNettyServerHttpRequest.java | 86 +++++++++++++++++++ .../rxnetty/RxNettyServerHttpResponse.java | 72 ++++++++++++++++ .../HttpHandlerRxNettyIntegrationTests.java | 46 ++++++++++ 5 files changed, 258 insertions(+) create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RequestHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpRequest.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpResponse.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/web/rxnetty/HttpHandlerRxNettyIntegrationTests.java diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 34c0b96a55..bf58b483b6 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -15,6 +15,7 @@ apply plugin: 'propdeps-maven' repositories { mavenCentral() + maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots } dependencies { @@ -24,6 +25,9 @@ dependencies { compile "org.slf4j:slf4j-api:1.7.6" compile "ch.qos.logback:logback-classic:1.1.2" + optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT" + optional "io.reactivex:rxjava-reactive-streams:1.0.1" + provided "javax.servlet:javax.servlet-api:3.1.0" testCompile "junit:junit:4.12" diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RequestHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RequestHandlerAdapter.java new file mode 100644 index 0000000000..2a7b7d7413 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RequestHandlerAdapter.java @@ -0,0 +1,50 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.rxnetty; + +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import io.reactivex.netty.protocol.http.server.RequestHandler; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; + +import org.springframework.reactive.web.HttpHandler; +import org.springframework.util.Assert; + +/** + * @author Rossen Stoyanchev + */ +public class RequestHandlerAdapter implements RequestHandler { + + private final HttpHandler httpHandler; + + + public RequestHandlerAdapter(HttpHandler httpHandler) { + Assert.notNull(httpHandler, "'httpHandler' is required."); + this.httpHandler = httpHandler; + } + + @Override + public Observable handle(HttpServerRequest request, HttpServerResponse response) { + RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request); + RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response); + Publisher result = this.httpHandler.handle(adaptedRequest, adaptedResponse); + return RxReactiveStreams.toObservable(result); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpRequest.java new file mode 100644 index 0000000000..5dd01a400f --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpRequest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.rxnetty; + +import java.net.URI; +import java.net.URISyntaxException; + +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.server.HttpServerRequest; +import org.reactivestreams.Publisher; +import rx.Observable; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.reactive.web.ServerHttpRequest; +import org.springframework.util.Assert; + +/** + * @author Rossen Stoyanchev + */ +public class RxNettyServerHttpRequest implements ServerHttpRequest { + + private final HttpServerRequest request; + + private HttpHeaders headers; + + + public RxNettyServerHttpRequest(HttpServerRequest request) { + Assert.notNull("'request', request must not be null."); + this.request = request; + } + + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (String name : this.request.getHeaderNames()) { + for (String value : this.request.getAllHeaderValues(name)) { + this.headers.add(name, value); + } + } + } + return this.headers; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(this.request.getHttpMethod().name()); + } + + @Override + public URI getURI() { + try { + return new URI(this.request.getUri()); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } + + } + + @Override + public Publisher getBody() { + Observable bytesContent = this.request.getContent().map(byteBuf -> { + byte[] copy = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(copy); + return copy; + }); + return rx.RxReactiveStreams.toPublisher(bytesContent); + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpResponse.java new file mode 100644 index 0000000000..d40c07463a --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/rxnetty/RxNettyServerHttpResponse.java @@ -0,0 +1,72 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.rxnetty; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.reactivex.netty.protocol.http.server.HttpServerResponse; +import org.reactivestreams.Publisher; +import rx.Observable; +import rx.RxReactiveStreams; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.reactive.web.ServerHttpResponse; +import org.springframework.util.Assert; + +/** + * @author Rossen Stoyanchev + */ +public class RxNettyServerHttpResponse implements ServerHttpResponse { + + private final HttpServerResponse response; + + private final HttpHeaders headers; + + private boolean headersWritten = false; + + + public RxNettyServerHttpResponse(HttpServerResponse response) { + Assert.notNull("'response', response must not be null."); + this.response = response; + this.headers = new HttpHeaders(); + } + + + @Override + public void setStatusCode(HttpStatus status) { + this.response.setStatus(HttpResponseStatus.valueOf(status.value())); + } + + @Override + public HttpHeaders getHeaders() { + return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); + } + + @Override + public Publisher writeWith(Publisher contentPublisher) { + writeHeaders(); + Observable contentObservable = RxReactiveStreams.toObservable(contentPublisher); + return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable)); + } + + private void writeHeaders() { + if (!this.headersWritten) { + for (String name : this.headers.keySet()) { + this.response.setHeader(name, this.headers.get(name)); + } + } + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/rxnetty/HttpHandlerRxNettyIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/rxnetty/HttpHandlerRxNettyIntegrationTests.java new file mode 100644 index 0000000000..eef884033d --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/rxnetty/HttpHandlerRxNettyIntegrationTests.java @@ -0,0 +1,46 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.reactive.web.rxnetty; + +import io.netty.buffer.ByteBuf; +import io.reactivex.netty.protocol.http.server.HttpServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import org.springframework.reactive.web.EchoHandler; +import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase; + +/** + * @author Rossen Stoyanchev + */ +public class HttpHandlerRxNettyIntegrationTests extends AbstractHttpHandlerIntegrationTestCase { + + private static HttpServer httpServer; + + + @BeforeClass + public static void startServer() throws Exception { + RequestHandlerAdapter requestHandler = new RequestHandlerAdapter(new EchoHandler()); + httpServer = HttpServer.newServer(port); + httpServer.start(requestHandler::handle); + } + + @AfterClass + public static void stopServer() throws Exception { + httpServer.shutdown(); + } + +} \ No newline at end of file -- GitLab