提交 c682895d 编写于 作者: R Rossen Stoyanchev

Add RxNetty support

This commit adds RxNetty integration that includes RxNetty-based
implementations of ServerHttpRequest and ServerHttpResponse as well as
an adapter from the RxNetty RequestHandler to the HttpHandler contracts.

Only byte[] is supported at the moment for reading and writing with a
corresponding copy to and from Netty ByteBuf.
上级 2cb32a0f
......@@ -15,6 +15,7 @@ apply plugin: 'propdeps-maven'
repositories {
mavenCentral()
maven { url 'https://oss.jfrog.org/libs-snapshot' } // RxNetty 0.5.x snapshots
}
dependencies {
......@@ -24,6 +25,9 @@ dependencies {
compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2"
optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT"
optional "io.reactivex:rxjava-reactive-streams:1.0.1"
provided "javax.servlet:javax.servlet-api:3.1.0"
testCompile "junit:junit:4.12"
......
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.rxnetty;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.reactive.web.HttpHandler;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class RequestHandlerAdapter implements RequestHandler<ByteBuf, ByteBuf> {
private final HttpHandler httpHandler;
public RequestHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "'httpHandler' is required.");
this.httpHandler = httpHandler;
}
@Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxReactiveStreams.toObservable(result);
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.rxnetty;
import java.net.URI;
import java.net.URISyntaxException;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import org.reactivestreams.Publisher;
import rx.Observable;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.ServerHttpRequest;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class RxNettyServerHttpRequest implements ServerHttpRequest {
private final HttpServerRequest<ByteBuf> request;
private HttpHeaders headers;
public RxNettyServerHttpRequest(HttpServerRequest<ByteBuf> request) {
Assert.notNull("'request', request must not be null.");
this.request = request;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.request.getHeaderNames()) {
for (String value : this.request.getAllHeaderValues(name)) {
this.headers.add(name, value);
}
}
}
return this.headers;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.request.getHttpMethod().name());
}
@Override
public URI getURI() {
try {
return new URI(this.request.getUri());
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
@Override
public Publisher<byte[]> getBody() {
Observable<byte[]> bytesContent = this.request.getContent().map(byteBuf -> {
byte[] copy = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(copy);
return copy;
});
return rx.RxReactiveStreams.toPublisher(bytesContent);
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.rxnetty;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.ServerHttpResponse;
import org.springframework.util.Assert;
/**
* @author Rossen Stoyanchev
*/
public class RxNettyServerHttpResponse implements ServerHttpResponse {
private final HttpServerResponse<?> response;
private final HttpHeaders headers;
private boolean headersWritten = false;
public RxNettyServerHttpResponse(HttpServerResponse<?> response) {
Assert.notNull("'response', response must not be null.");
this.response = response;
this.headers = new HttpHeaders();
}
@Override
public void setStatusCode(HttpStatus status) {
this.response.setStatus(HttpResponseStatus.valueOf(status.value()));
}
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public Publisher<Void> writeWith(Publisher<byte[]> contentPublisher) {
writeHeaders();
Observable<byte[]> contentObservable = RxReactiveStreams.toObservable(contentPublisher);
return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable));
}
private void writeHeaders() {
if (!this.headersWritten) {
for (String name : this.headers.keySet()) {
this.response.setHeader(name, this.headers.get(name));
}
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.rxnetty;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.springframework.reactive.web.EchoHandler;
import org.springframework.reactive.web.AbstractHttpHandlerIntegrationTestCase;
/**
* @author Rossen Stoyanchev
*/
public class HttpHandlerRxNettyIntegrationTests extends AbstractHttpHandlerIntegrationTestCase {
private static HttpServer<ByteBuf, ByteBuf> httpServer;
@BeforeClass
public static void startServer() throws Exception {
RequestHandlerAdapter requestHandler = new RequestHandlerAdapter(new EchoHandler());
httpServer = HttpServer.newServer(port);
httpServer.start(requestHandler::handle);
}
@AfterClass
public static void stopServer() throws Exception {
httpServer.shutdown();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册