提交 d9d84ff3 编写于 作者: J Juergen Hoeller

Polishing

上级 d047174c
......@@ -36,6 +36,13 @@ import org.springframework.core.io.buffer.DataBufferFactory;
*/
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Return a {@link DataBufferFactory} that can be used for creating the body.
* @return a buffer factory
* @see #writeWith(Publisher)
*/
DataBufferFactory bufferFactory();
/**
* Register an action to be applied just before the message is committed.
* @param action the action
......@@ -45,7 +52,6 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Use the given {@link Publisher} to write the body of the message to the underlying
* HTTP layer.
*
* @param body the body content publisher
* @return a publisher that indicates completion or error.
*/
......@@ -55,19 +61,11 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
* Use the given {@link Publisher} of {@code Publishers} to write the body of the
* message to the underlying HTTP layer, flushing after each
* {@code Publisher<DataBuffer>}.
*
* @param body the body content publisher
* @return a publisher that indicates completion or error.
*/
Mono<Void> writeAndFlushWith(Publisher<Publisher<DataBuffer>> body);
/**
* Returns a {@link DataBufferFactory} that can be used for creating the body.
* @return a buffer factory
* @see #writeWith(Publisher)
*/
DataBufferFactory bufferFactory();
/**
* Indicate that message handling is complete, allowing for any cleanup or
* end-of-processing tasks to be performed such as applying header changes
......
......@@ -46,13 +46,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
protected final Log logger = LogFactory.getLog(getClass());
private final AtomicReference<State> state =
new AtomicReference<>(State.UNSUBSCRIBED);
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private final AtomicLong demand = new AtomicLong();
private Subscriber<? super DataBuffer> subscriber;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
if (this.logger.isTraceEnabled()) {
......@@ -126,14 +126,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
protected abstract DataBuffer read() throws IOException;
private boolean hasDemand() {
return this.demand.get() > 0;
return (this.demand.get() > 0);
}
private boolean changeState(AbstractRequestBodyPublisher.State oldState,
AbstractRequestBodyPublisher.State newState) {
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
private static final class RequestBodySubscription implements Subscription {
private final AbstractRequestBodyPublisher publisher;
......@@ -158,12 +158,12 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
state().cancel(this.publisher);
}
private AbstractRequestBodyPublisher.State state() {
private State state() {
return this.publisher.state.get();
}
}
/**
* Represents a state for the {@link Publisher} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
......@@ -182,8 +182,8 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* </pre>
* Refer to the individual states for more information.
*/
private enum State {
/**
* The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractRequestBodyPublisher, Subscriber)} by
......@@ -191,12 +191,10 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
UNSUBSCRIBED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
Objects.requireNonNull(subscriber);
if (publisher.changeState(this, NO_DEMAND)) {
Subscription subscription = new RequestBodySubscription(
publisher);
Subscription subscription = new RequestBodySubscription(publisher);
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
}
......@@ -205,6 +203,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
},
/**
* State that gets entered when there is no demand. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand,
......@@ -222,6 +221,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
},
/**
* State that gets entered when there is demand. Responds to
* {@link #onDataAvailable(AbstractRequestBodyPublisher)} by
......@@ -237,15 +237,18 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
if (demandAvailable) {
publisher.changeState(READING, DEMAND);
publisher.checkOnDataAvailable();
} else {
}
else {
publisher.changeState(READING, NO_DEMAND);
}
} catch (IOException ex) {
}
catch (IOException ex) {
publisher.onError(ex);
}
}
}
},
READING {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
......@@ -254,34 +257,30 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
// ignore
}
@Override
void cancel(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
// ignore
}
@Override
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
void subscribe(AbstractRequestBodyPublisher publisher, Subscriber<? super DataBuffer> subscriber) {
throw new IllegalStateException(toString());
}
......@@ -312,6 +311,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
}
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.net.URI;
......@@ -60,13 +61,6 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
return this.uri;
}
/**
* Initialize a URI that represents the request. Invoked lazily on the first
* call to {@link #getURI()} and then cached.
* @throws URISyntaxException
*/
protected abstract URI initUri() throws URISyntaxException;
@Override
public MultiValueMap<String, String> getQueryParams() {
if (this.queryParams == null) {
......@@ -99,12 +93,6 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
return this.headers;
}
/**
* Initialize the headers from the underlying request. Invoked lazily on the
* first call to {@link #getHeaders()} and then cached.
*/
protected abstract HttpHeaders initHeaders();
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
if (this.cookies == null) {
......@@ -113,9 +101,24 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
return this.cookies;
}
/**
* Initialize a URI that represents the request.
* <p>Invoked lazily on the first call to {@link #getURI()} and then cached.
* @throws URISyntaxException
*/
protected abstract URI initUri() throws URISyntaxException;
/**
* Initialize the headers from the underlying request.
* <p>Invoked lazily on the first call to {@link #getHeaders()} and then cached.
*/
protected abstract HttpHeaders initHeaders();
/**
* Initialize the cookies from the underlying request. Invoked lazily on the
* first access to cookies via {@link #getHeaders()} and then cached.
* Initialize the cookies from the underlying request.
* <p>Invoked lazily on the first access to cookies via {@link #getHeaders()}
* and then cached.
*/
protected abstract MultiValueMap<String, HttpCookie> initCookies();
......
......@@ -36,7 +36,6 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Base class for {@link ServerHttpResponse} implementations.
*
......@@ -46,8 +45,6 @@ import org.springframework.util.MultiValueMap;
*/
public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private Log logger = LogFactory.getLog(getClass());
private static final int STATE_NEW = 1;
private static final int STATE_COMMITTING = 2;
......@@ -55,10 +52,9 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private static final int STATE_COMMITTED = 3;
private final DataBufferFactory dataBufferFactory;
private final Log logger = LogFactory.getLog(getClass());
private HttpStatus statusCode;
private final DataBufferFactory dataBufferFactory;
private final HttpHeaders headers;
......@@ -68,6 +64,8 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private final AtomicInteger state = new AtomicInteger(STATE_NEW);
private HttpStatus statusCode;
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory) {
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
......@@ -83,7 +81,6 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return this.dataBufferFactory;
}
@Override
public boolean setStatusCode(HttpStatus statusCode) {
Assert.notNull(statusCode);
......@@ -139,6 +136,11 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
.then(() -> writeAndFlushWithInternal(writePublisher)));
}
@Override
public Mono<Void> setComplete() {
return applyBeforeCommit();
}
protected Mono<Void> applyBeforeCommit() {
Mono<Void> mono = Mono.empty();
if (this.state.compareAndSet(STATE_NEW, STATE_COMMITTING)) {
......@@ -160,6 +162,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
return mono;
}
/**
* Implement this method to write to the underlying the response.
* @param body the publisher to write with
......@@ -171,8 +174,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
* each {@code Publisher<DataBuffer>}.
* @param body the publisher to write and flush with
*/
protected abstract Mono<Void> writeAndFlushWithInternal(
Publisher<Publisher<DataBuffer>> body);
protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body);
/**
* Implement this method to write the status code to the underlying response.
......@@ -192,10 +194,4 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
*/
protected abstract void writeCookies();
@Override
public Mono<Void> setComplete() {
return applyBeforeCommit();
}
}
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.function.Function;
......@@ -42,17 +43,18 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
private final Function<Publisher<T>, Publisher<Void>> writeFunction;
public ChannelSendOperator(Publisher<? extends T> source,
Function<Publisher<T>, Publisher<Void>> writeFunction) {
public ChannelSendOperator(Publisher<? extends T> source, Function<Publisher<T>, Publisher<Void>> writeFunction) {
super(source);
this.writeFunction = writeFunction;
}
@Override
public void subscribe(Subscriber<? super Void> s) {
source.subscribe(new WriteWithBarrier(s));
this.source.subscribe(new WriteWithBarrier(s));
}
private class WriteWithBarrier extends Operators.SubscriberAdapter<T, Void> implements Publisher<T> {
/**
......@@ -77,17 +79,14 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
/** The actual writeSubscriber vs the downstream completion subscriber */
private Subscriber<? super T> writeSubscriber;
public WriteWithBarrier(Subscriber<? super Void> subscriber) {
super(subscriber);
}
@Override
protected void doOnSubscribe(Subscription subscription) {
super.doOnSubscribe(subscription);
super.upstream()
.request(1); // bypass doRequest
super.upstream().request(1); // bypass doRequest
}
@Override
......@@ -156,7 +155,7 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
@Override
public void subscribe(Subscriber<? super T> writeSubscriber) {
synchronized (this) {
Assert.isNull(this.writeSubscriber, "Only one writeSubscriber supported.");
Assert.isNull(this.writeSubscriber, "Only one writeSubscriber supported");
this.writeSubscriber = writeSubscriber;
if (this.error != null || this.completed) {
......@@ -210,6 +209,7 @@ public class ChannelSendOperator<T> extends MonoSource<T, Void> {
}
}
private class DownstreamBridge implements Subscriber<Void> {
private final Subscriber<? super Void> downstream;
......
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -28,10 +28,9 @@ public interface HttpHandler {
/**
* Handle the given request and generate a response.
*
* @param request current HTTP request.
* @param response current HTTP response.
* @return {@code Mono<Void>} to indicate when request handling is complete.
* @param request current HTTP request
* @param response current HTTP response
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
......
......@@ -34,11 +34,9 @@ import reactor.core.publisher.Operators;
*/
class ResponseBodyWriteResultPublisher implements Publisher<Void> {
private static final Log logger =
LogFactory.getLog(ResponseBodyWriteResultPublisher.class);
private static final Log logger = LogFactory.getLog(ResponseBodyWriteResultPublisher.class);
private final AtomicReference<State> state =
new AtomicReference<>(State.UNSUBSCRIBED);
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private Subscriber<? super Void> subscriber;
......@@ -46,6 +44,7 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
private volatile Throwable publisherError;
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
if (logger.isTraceEnabled()) {
......@@ -78,13 +77,12 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
this.state.get().publishError(this, t);
}
private static final class ResponseBodyWriteResultSubscription
implements Subscription {
private static final class ResponseBodyWriteResultSubscription implements Subscription {
private final ResponseBodyWriteResultPublisher publisher;
public ResponseBodyWriteResultSubscription(
ResponseBodyWriteResultPublisher publisher) {
public ResponseBodyWriteResultSubscription(ResponseBodyWriteResultPublisher publisher) {
this.publisher = publisher;
}
......@@ -107,10 +105,11 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
private State state() {
return this.publisher.state.get();
}
}
private enum State {
UNSUBSCRIBED {
@Override
void subscribe(ResponseBodyWriteResultPublisher publisher,
......@@ -132,62 +131,55 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
throw new IllegalStateException(toString());
}
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
publisher.publisherCompleted = true;
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
publisher.publisherError = t;
}
},
SUBSCRIBED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
Operators.checkRequest(n, publisher.subscriber);
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onComplete();
}
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.subscriber.onError(t);
}
}
},
COMPLETED {
@Override
void request(ResponseBodyWriteResultPublisher publisher, long n) {
// ignore
}
@Override
void cancel(ResponseBodyWriteResultPublisher publisher) {
// ignore
}
@Override
void publishComplete(ResponseBodyWriteResultPublisher publisher) {
// ignore
}
@Override
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(ResponseBodyWriteResultPublisher publisher,
Subscriber<? super Void> subscriber) {
void subscribe(ResponseBodyWriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
throw new IllegalStateException(toString());
}
......@@ -206,8 +198,6 @@ class ResponseBodyWriteResultPublisher implements Publisher<Void> {
void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) {
throw new IllegalStateException(toString());
}
}
}
......@@ -45,9 +45,9 @@ public interface ServerHttpResponse extends ReactiveHttpOutputMessage {
*/
HttpStatus getStatusCode();
/**
* Return a mutable map with the cookies to send to the server.
*/
/**
* Return a mutable map with the cookies to send to the server.
*/
MultiValueMap<String, ResponseCookie> getCookies();
/**
......
......@@ -47,6 +47,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class);
private final HttpHandler handler;
......@@ -57,15 +58,17 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private int bufferSize = DEFAULT_BUFFER_SIZE;
/**
* Create a new {@code ServletHttpHandlerAdapter} with the given HTTP handler.
* @param handler the handler
*/
public ServletHttpHandlerAdapter(HttpHandler handler) {
Assert.notNull(handler, "'handler' must not be null");
Assert.notNull(handler, "HttpHandler must not be null");
this.handler = handler;
}
public void setDataBufferFactory(DataBufferFactory dataBufferFactory) {
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
this.dataBufferFactory = dataBufferFactory;
......@@ -76,26 +79,21 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
this.bufferSize = bufferSize;
}
@Override
protected void service(HttpServletRequest servletRequest,
HttpServletResponse servletResponse) throws ServletException, IOException {
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {
AsyncContext asyncContext = servletRequest.startAsync();
ServletServerHttpRequest request =
new ServletServerHttpRequest(servletRequest, this.dataBufferFactory,
this.bufferSize);
ServletServerHttpResponse response =
new ServletServerHttpResponse(servletResponse, this.dataBufferFactory,
this.bufferSize);
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber(asyncContext);
ServletServerHttpRequest request = new ServletServerHttpRequest(
servletRequest, this.dataBufferFactory, this.bufferSize);
ServletServerHttpResponse response = new ServletServerHttpResponse(
servletResponse, this.dataBufferFactory, this.bufferSize);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(asyncContext);
this.handler.handle(request, response).subscribe(resultSubscriber);
}
private static class HandlerResultSubscriber implements Subscriber<Void> {
private final AsyncContext asyncContext;
......@@ -117,8 +115,7 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
@Override
public void onError(Throwable ex) {
logger.error("Error from request handling. Completing the request.", ex);
HttpServletResponse response =
(HttpServletResponse) this.asyncContext.getResponse();
HttpServletResponse response = (HttpServletResponse) this.asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
this.asyncContext.complete();
}
......@@ -129,5 +126,4 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
}
}
\ No newline at end of file
}
......@@ -49,27 +49,29 @@ import org.springframework.util.StringUtils;
*/
public class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final Object bodyPublisherMonitor = new Object();
private volatile RequestBodyPublisher bodyPublisher;
private final HttpServletRequest request;
private final DataBufferFactory dataBufferFactory;
private final int bufferSize;
private final Object bodyPublisherMonitor = new Object();
private volatile RequestBodyPublisher bodyPublisher;
public ServletServerHttpRequest(HttpServletRequest request,
DataBufferFactory dataBufferFactory, int bufferSize) {
Assert.notNull(request, "'request' must not be null.");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0);
Assert.notNull(request, "HttpServletRequest must not be null");
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
Assert.isTrue(bufferSize > 0, "Buffer size must be higher than 0");
this.request = request;
this.dataBufferFactory = dataBufferFactory;
this.bufferSize = bufferSize;
}
public HttpServletRequest getServletRequest() {
return this.request;
}
......@@ -151,7 +153,8 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
synchronized (this.bodyPublisherMonitor) {
bodyPublisher = this.bodyPublisher;
if (bodyPublisher == null) {
this.bodyPublisher = bodyPublisher = createBodyPublisher();
bodyPublisher = createBodyPublisher();
this.bodyPublisher = bodyPublisher;
}
}
}
......@@ -163,13 +166,13 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
private RequestBodyPublisher createBodyPublisher() throws IOException {
RequestBodyPublisher bodyPublisher =
new RequestBodyPublisher(request.getInputStream(), this.dataBufferFactory,
this.bufferSize);
RequestBodyPublisher bodyPublisher = new RequestBodyPublisher(
this.request.getInputStream(), this.dataBufferFactory, this.bufferSize);
bodyPublisher.registerListener();
return bodyPublisher;
}
private static class RequestBodyPublisher extends AbstractRequestBodyPublisher {
private final RequestBodyPublisher.RequestBodyReadListener readListener =
......@@ -183,6 +186,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
public RequestBodyPublisher(ServletInputStream inputStream,
DataBufferFactory dataBufferFactory, int bufferSize) {
this.inputStream = inputStream;
this.dataBufferFactory = dataBufferFactory;
this.buffer = new byte[bufferSize];
......@@ -216,6 +220,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
return null;
}
private class RequestBodyReadListener implements ReadListener {
@Override
......@@ -235,4 +240,5 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
}
}
}
......@@ -46,23 +46,22 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener();
private volatile ResponseBodyProcessor bodyProcessor;
private final HttpServletResponse response;
private final int bufferSize;
private volatile boolean flushOnNext;
private volatile ResponseBodyProcessor bodyProcessor;
public ServletServerHttpResponse(HttpServletResponse response,
DataBufferFactory dataBufferFactory, int bufferSize) throws IOException {
super(dataBufferFactory);
Assert.notNull(response, "'response' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
Assert.isTrue(bufferSize > 0);
Assert.notNull(response, "HttpServletResponse must not be null");
Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");
Assert.isTrue(bufferSize > 0, "Buffer size must be higher than 0");
this.response = response;
this.bufferSize = bufferSize;
}
......@@ -159,13 +158,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private final int bufferSize;
public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) {
this.outputStream = outputStream;
this.bufferSize = bufferSize;
}
@Override
protected boolean isWritePossible() {
return this.outputStream.isReady();
......@@ -179,13 +176,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
flush();
}
boolean ready = this.outputStream.isReady();
if (this.logger.isTraceEnabled()) {
this.logger.trace("write: " + dataBuffer + " ready: " + ready);
}
if (ready) {
int total = dataBuffer.readableByteCount();
int written = writeDataBuffer(dataBuffer);
......@@ -202,21 +196,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1;
while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
this.outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead;
}
return bytesWritten;
}
}
private class ResponseBodyWriteListener implements WriteListener {
@Override
......@@ -235,6 +226,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
@Override
......@@ -255,7 +247,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
ServletServerHttpResponse.this.flush();
}
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册