提交 d219054b 编写于 作者: R Rossen Stoyanchev

Polish

上级 30152866
...@@ -26,7 +26,9 @@ import org.springframework.core.io.buffer.DataBuffer; ...@@ -26,7 +26,9 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
/** /**
* Abstract base class for listener-based server responses, i.e. Servlet 3.1 and Undertow. * Abstract base class for listener-based server responses, e.g. Servlet 3.1
* and Undertow.
*
* @author Arjen Poutsma * @author Arjen Poutsma
* @since 5.0 * @since 5.0
*/ */
...@@ -34,10 +36,12 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH ...@@ -34,10 +36,12 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
private final AtomicBoolean writeCalled = new AtomicBoolean(); private final AtomicBoolean writeCalled = new AtomicBoolean();
public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) { public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory); super(dataBufferFactory);
} }
@Override @Override
protected final Mono<Void> writeWithInternal(Publisher<DataBuffer> body) { protected final Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
return writeAndFlushWithInternal(Mono.just(body)); return writeAndFlushWithInternal(Mono.just(body));
...@@ -46,13 +50,13 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH ...@@ -46,13 +50,13 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
@Override @Override
protected final Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body) { protected final Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> body) {
if (this.writeCalled.compareAndSet(false, true)) { if (this.writeCalled.compareAndSet(false, true)) {
Processor<Publisher<DataBuffer>, Void> bodyProcessor = Processor<Publisher<DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
createBodyFlushProcessor();
return Mono.from(subscriber -> { return Mono.from(subscriber -> {
body.subscribe(bodyProcessor); body.subscribe(bodyProcessor);
bodyProcessor.subscribe(subscriber); bodyProcessor.subscribe(subscriber);
}); });
} else { }
else {
return Mono.error(new IllegalStateException( return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called")); "writeWith() or writeAndFlushWith() has already been called"));
} }
...@@ -64,4 +68,5 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH ...@@ -64,4 +68,5 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
* {@link #writeAndFlushWithInternal(Publisher)}. * {@link #writeAndFlushWithInternal(Publisher)}.
*/ */
protected abstract Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor(); protected abstract Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor();
} }
...@@ -35,26 +35,25 @@ import org.springframework.core.io.buffer.DataBuffer; ...@@ -35,26 +35,25 @@ import org.springframework.core.io.buffer.DataBuffer;
* Servlet 3.1 and Undertow support. * Servlet 3.1 and Undertow support.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Violeta Georgieva
* @since 5.0 * @since 5.0
* @see ServletServerHttpRequest * @see ServletServerHttpRequest
* @see UndertowHttpHandlerAdapter * @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse#writeAndFlushWith(Publisher) * @see ServerHttpResponse#writeAndFlushWith(Publisher)
*/ */
abstract class AbstractResponseBodyFlushProcessor abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher<DataBuffer>, Void> {
implements Processor<Publisher<DataBuffer>, Void> {
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
private final ResponseBodyWriteResultPublisher publisherDelegate = private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher();
new ResponseBodyWriteResultPublisher();
private final AtomicReference<State> state = private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
new AtomicReference<>(State.UNSUBSCRIBED);
private volatile boolean subscriberCompleted; private volatile boolean subscriberCompleted;
private Subscription subscription; private Subscription subscription;
// Subscriber // Subscriber
@Override @Override
...@@ -89,13 +88,15 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -89,13 +88,15 @@ abstract class AbstractResponseBodyFlushProcessor
this.state.get().onComplete(this); this.state.get().onComplete(this);
} }
// Publisher // Publisher
@Override @Override
public final void subscribe(Subscriber<? super Void> subscriber) { public final void subscribe(Subscriber<? super Void> subscriber) {
this.publisherDelegate.subscribe(subscriber); this.resultPublisher.subscribe(subscriber);
} }
/** /**
* Creates a new processor for subscribing to a body chunk. * Creates a new processor for subscribing to a body chunk.
*/ */
...@@ -106,8 +107,9 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -106,8 +107,9 @@ abstract class AbstractResponseBodyFlushProcessor
*/ */
protected abstract void flush() throws IOException; protected abstract void flush() throws IOException;
private void cancel() {
this.subscription.cancel(); private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
} }
private void writeComplete() { private void writeComplete() {
...@@ -118,15 +120,17 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -118,15 +120,17 @@ abstract class AbstractResponseBodyFlushProcessor
} }
private boolean changeState(State oldState, State newState) { private void cancel() {
return this.state.compareAndSet(oldState, newState); this.subscription.cancel();
} }
private enum State { private enum State {
UNSUBSCRIBED { UNSUBSCRIBED {
@Override @Override
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null"); Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) { if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription; processor.subscription = subscription;
...@@ -138,25 +142,25 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -138,25 +142,25 @@ abstract class AbstractResponseBodyFlushProcessor
} }
}, },
REQUESTED { REQUESTED {
@Override @Override
public void onNext(AbstractResponseBodyFlushProcessor processor, public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> chunk) {
Publisher<DataBuffer> chunk) {
if (processor.changeState(this, RECEIVED)) { if (processor.changeState(this, RECEIVED)) {
Processor<DataBuffer, Void> chunkProcessor = Processor<DataBuffer, Void> chunkProcessor = processor.createBodyProcessor();
processor.createBodyProcessor();
chunk.subscribe(chunkProcessor); chunk.subscribe(chunkProcessor);
chunkProcessor.subscribe(new WriteSubscriber(processor)); chunkProcessor.subscribe(new WriteSubscriber(processor));
} }
} }
@Override @Override
void onComplete(AbstractResponseBodyFlushProcessor processor) { public void onComplete(AbstractResponseBodyFlushProcessor processor) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publisherDelegate.publishComplete(); processor.resultPublisher.publishComplete();
} }
} }
}, },
RECEIVED { RECEIVED {
@Override @Override
public void writeComplete(AbstractResponseBodyFlushProcessor processor) { public void writeComplete(AbstractResponseBodyFlushProcessor processor) {
try { try {
...@@ -169,7 +173,7 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -169,7 +173,7 @@ abstract class AbstractResponseBodyFlushProcessor
if (processor.subscriberCompleted) { if (processor.subscriberCompleted) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publisherDelegate.publishComplete(); processor.resultPublisher.publishComplete();
} }
} }
else { else {
...@@ -180,11 +184,12 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -180,11 +184,12 @@ abstract class AbstractResponseBodyFlushProcessor
} }
@Override @Override
void onComplete(AbstractResponseBodyFlushProcessor processor) { public void onComplete(AbstractResponseBodyFlushProcessor processor) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
} }
}, },
COMPLETED { COMPLETED {
@Override @Override
public void onNext(AbstractResponseBodyFlushProcessor processor, public void onNext(AbstractResponseBodyFlushProcessor processor,
Publisher<DataBuffer> publisher) { Publisher<DataBuffer> publisher) {
...@@ -193,33 +198,31 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -193,33 +198,31 @@ abstract class AbstractResponseBodyFlushProcessor
} }
@Override @Override
void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { public void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) {
// ignore // ignore
} }
@Override @Override
void onComplete(AbstractResponseBodyFlushProcessor processor) { public void onComplete(AbstractResponseBodyFlushProcessor processor) {
// ignore // ignore
} }
}; };
public void onSubscribe(AbstractResponseBodyFlushProcessor processor, public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) {
Subscription subscription) {
subscription.cancel(); subscription.cancel();
} }
public void onNext(AbstractResponseBodyFlushProcessor processor, public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> publisher) {
Publisher<DataBuffer> publisher) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { public void onError(AbstractResponseBodyFlushProcessor processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publisherDelegate.publishError(t); processor.resultPublisher.publishError(ex);
} }
} }
void onComplete(AbstractResponseBodyFlushProcessor processor) { public void onComplete(AbstractResponseBodyFlushProcessor processor) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
...@@ -227,6 +230,7 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -227,6 +230,7 @@ abstract class AbstractResponseBodyFlushProcessor
// ignore // ignore
} }
private static class WriteSubscriber implements Subscriber<Void> { private static class WriteSubscriber implements Subscriber<Void> {
private final AbstractResponseBodyFlushProcessor processor; private final AbstractResponseBodyFlushProcessor processor;
...@@ -236,8 +240,8 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -236,8 +240,8 @@ abstract class AbstractResponseBodyFlushProcessor
} }
@Override @Override
public void onSubscribe(Subscription s) { public void onSubscribe(Subscription subscription) {
s.request(Long.MAX_VALUE); subscription.request(Long.MAX_VALUE);
} }
@Override @Override
...@@ -245,14 +249,14 @@ abstract class AbstractResponseBodyFlushProcessor ...@@ -245,14 +249,14 @@ abstract class AbstractResponseBodyFlushProcessor
} }
@Override @Override
public void onError(Throwable t) { public void onError(Throwable ex) {
processor.cancel(); this.processor.cancel();
processor.onError(t); this.processor.onError(ex);
} }
@Override @Override
public void onComplete() { public void onComplete() {
processor.writeComplete(); this.processor.writeComplete();
} }
} }
} }
......
...@@ -48,11 +48,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -48,11 +48,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
protected final Log logger = LogFactory.getLog(getClass()); protected final Log logger = LogFactory.getLog(getClass());
private final ResponseBodyWriteResultPublisher publisherDelegate = private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher();
new ResponseBodyWriteResultPublisher();
private final AtomicReference<State> state = private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
new AtomicReference<>(State.UNSUBSCRIBED);
private volatile DataBuffer currentBuffer; private volatile DataBuffer currentBuffer;
...@@ -60,6 +58,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -60,6 +58,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
private Subscription subscription; private Subscription subscription;
// Subscriber // Subscriber
@Override @Override
...@@ -94,13 +93,15 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -94,13 +93,15 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
this.state.get().onComplete(this); this.state.get().onComplete(this);
} }
// Publisher // Publisher
@Override @Override
public final void subscribe(Subscriber<? super Void> subscriber) { public final void subscribe(Subscriber<? super Void> subscriber) {
this.publisherDelegate.subscribe(subscriber); this.resultPublisher.subscribe(subscriber);
} }
// listener methods // listener methods
/** /**
...@@ -167,6 +168,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -167,6 +168,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
return this.state.compareAndSet(oldState, newState); return this.state.compareAndSet(oldState, newState);
} }
/** /**
* Represents a state for the {@link Subscriber} to be in. The following figure * Represents a state for the {@link Subscriber} to be in. The following figure
* indicate the four different states that exist, and the relationships between them. * indicate the four different states that exist, and the relationships between them.
...@@ -193,9 +195,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -193,9 +195,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* #REQUESTED}. * #REQUESTED}.
*/ */
UNSUBSCRIBED { UNSUBSCRIBED {
@Override @Override
void onSubscribe(AbstractResponseBodyProcessor processor, public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
Subscription subscription) {
Objects.requireNonNull(subscription, "Subscription cannot be null"); Objects.requireNonNull(subscription, "Subscription cannot be null");
if (processor.changeState(this, REQUESTED)) { if (processor.changeState(this, REQUESTED)) {
processor.subscription = subscription; processor.subscription = subscription;
...@@ -213,8 +215,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -213,8 +215,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* changing state to {@link #COMPLETED}. * changing state to {@link #COMPLETED}.
*/ */
REQUESTED { REQUESTED {
@Override @Override
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
if (processor.changeState(this, RECEIVED)) { if (processor.changeState(this, RECEIVED)) {
processor.receiveBuffer(dataBuffer); processor.receiveBuffer(dataBuffer);
processor.writeIfPossible(); processor.writeIfPossible();
...@@ -222,9 +225,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -222,9 +225,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
} }
@Override @Override
void onComplete(AbstractResponseBodyProcessor processor) { public void onComplete(AbstractResponseBodyProcessor processor) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publisherDelegate.publishComplete(); processor.resultPublisher.publishComplete();
} }
} }
}, },
...@@ -238,8 +241,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -238,8 +241,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* be written completely the state will be changed to {@link #RECEIVED}. * be written completely the state will be changed to {@link #RECEIVED}.
*/ */
RECEIVED { RECEIVED {
@Override @Override
void onWritePossible(AbstractResponseBodyProcessor processor) { public void onWritePossible(AbstractResponseBodyProcessor processor) {
if (processor.changeState(this, WRITING)) { if (processor.changeState(this, WRITING)) {
DataBuffer dataBuffer = processor.currentBuffer; DataBuffer dataBuffer = processor.currentBuffer;
try { try {
...@@ -252,7 +256,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -252,7 +256,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
} }
else { else {
processor.changeState(WRITING, COMPLETED); processor.changeState(WRITING, COMPLETED);
processor.publisherDelegate.publishComplete(); processor.resultPublisher.publishComplete();
} }
} }
else { else {
...@@ -268,7 +272,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -268,7 +272,7 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
} }
@Override @Override
void onComplete(AbstractResponseBodyProcessor processor) { public void onComplete(AbstractResponseBodyProcessor processor) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
} }
}, },
...@@ -277,8 +281,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -277,8 +281,9 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* {@code onWritePossible started}. * {@code onWritePossible started}.
*/ */
WRITING { WRITING {
@Override @Override
void onComplete(AbstractResponseBodyProcessor processor) { public void onComplete(AbstractResponseBodyProcessor processor) {
processor.subscriberCompleted = true; processor.subscriberCompleted = true;
} }
}, },
...@@ -286,43 +291,44 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo ...@@ -286,43 +291,44 @@ abstract class AbstractResponseBodyProcessor implements Processor<DataBuffer, Vo
* The terminal completed state. Does not respond to any events. * The terminal completed state. Does not respond to any events.
*/ */
COMPLETED { COMPLETED {
@Override @Override
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
// ignore // ignore
} }
@Override @Override
void onError(AbstractResponseBodyProcessor processor, Throwable t) { public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
// ignore // ignore
} }
@Override @Override
void onComplete(AbstractResponseBodyProcessor processor) { public void onComplete(AbstractResponseBodyProcessor processor) {
// ignore // ignore
} }
}; };
void onSubscribe(AbstractResponseBodyProcessor processor, Subscription s) { public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) {
s.cancel(); subscription.cancel();
} }
void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) { public void onNext(AbstractResponseBodyProcessor processor, DataBuffer dataBuffer) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
void onError(AbstractResponseBodyProcessor processor, Throwable t) { public void onError(AbstractResponseBodyProcessor processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) { if (processor.changeState(this, COMPLETED)) {
processor.publisherDelegate.publishError(t); processor.resultPublisher.publishError(ex);
} }
} }
void onComplete(AbstractResponseBodyProcessor processor) { public void onComplete(AbstractResponseBodyProcessor processor) {
throw new IllegalStateException(toString()); throw new IllegalStateException(toString());
} }
void onWritePossible(AbstractResponseBodyProcessor processor) {
public void onWritePossible(AbstractResponseBodyProcessor processor) {
// ignore // ignore
} }
} }
} }
...@@ -54,8 +54,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -54,8 +54,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private volatile boolean flushOnNext; private volatile boolean flushOnNext;
public ServletServerHttpResponse(HttpServletResponse response, public ServletServerHttpResponse(HttpServletResponse response,
DataBufferFactory dataBufferFactory, int bufferSize) throws IOException { DataBufferFactory dataBufferFactory, int bufferSize) throws IOException {
super(dataBufferFactory); super(dataBufferFactory);
Assert.notNull(response, "'response' must not be null"); Assert.notNull(response, "'response' must not be null");
Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null"); Assert.notNull(dataBufferFactory, "'dataBufferFactory' must not be null");
...@@ -65,6 +67,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -65,6 +67,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
public HttpServletResponse getServletResponse() { public HttpServletResponse getServletResponse() {
return this.response; return this.response;
} }
...@@ -112,6 +115,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -112,6 +115,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
} }
} }
@Override
protected Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor() {
Processor<Publisher<DataBuffer>, Void> processor = new ResponseBodyFlushProcessor();
registerListener();
return processor;
}
private void registerListener() { private void registerListener() {
try { try {
outputStream().setWriteListener(writeListener); outputStream().setWriteListener(writeListener);
...@@ -121,6 +131,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -121,6 +131,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
} }
} }
private ServletOutputStream outputStream() throws IOException {
return this.response.getOutputStream();
}
private void flush() throws IOException { private void flush() throws IOException {
ServletOutputStream outputStream = outputStream(); ServletOutputStream outputStream = outputStream();
if (outputStream.isReady()) { if (outputStream.isReady()) {
...@@ -138,16 +152,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -138,16 +152,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
} }
} }
private ServletOutputStream outputStream() throws IOException {
return this.response.getOutputStream();
}
@Override
protected Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor() {
Processor<Publisher<DataBuffer>, Void> processor = new ResponseBodyFlushProcessor();
registerListener();
return processor;
}
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
...@@ -155,11 +159,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -155,11 +159,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
private final int bufferSize; private final int bufferSize;
public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) {
this.outputStream = outputStream; this.outputStream = outputStream;
this.bufferSize = bufferSize; this.bufferSize = bufferSize;
} }
@Override @Override
protected boolean isWritePossible() { protected boolean isWritePossible() {
return this.outputStream.isReady(); return this.outputStream.isReady();
...@@ -201,8 +207,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons ...@@ -201,8 +207,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
byte[] buffer = new byte[this.bufferSize]; byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1; int bytesRead = -1;
while (this.outputStream.isReady() && while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
(bytesRead = input.read(buffer)) != -1) {
this.outputStream.write(buffer, 0, bytesRead); this.outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead; bytesWritten += bytesRead;
} }
......
...@@ -55,13 +55,14 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -55,13 +55,14 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
private StreamSinkChannel responseChannel; private StreamSinkChannel responseChannel;
public UndertowServerHttpResponse(HttpServerExchange exchange,
DataBufferFactory dataBufferFactory) { public UndertowServerHttpResponse(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
super(dataBufferFactory); super(bufferFactory);
Assert.notNull(exchange, "'exchange' is required."); Assert.notNull(exchange, "'exchange' is required.");
this.exchange = exchange; this.exchange = exchange;
} }
public HttpServerExchange getUndertowExchange() { public HttpServerExchange getUndertowExchange() {
return this.exchange; return this.exchange;
} }
...@@ -78,10 +79,8 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -78,10 +79,8 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
public Mono<Void> writeWith(File file, long position, long count) { public Mono<Void> writeWith(File file, long position, long count) {
writeHeaders(); writeHeaders();
writeCookies(); writeCookies();
try { try {
StreamSinkChannel responseChannel = StreamSinkChannel responseChannel = getUndertowExchange().getResponseChannel();
getUndertowExchange().getResponseChannel();
@SuppressWarnings("resource") @SuppressWarnings("resource")
FileChannel in = new FileInputStream(file).getChannel(); FileChannel in = new FileInputStream(file).getChannel();
long result = responseChannel.transferFrom(in, position, count); long result = responseChannel.transferFrom(in, position, count);
...@@ -123,20 +122,20 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -123,20 +122,20 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
} }
} }
@Override
protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() {
return new ResponseBodyFlushProcessor();
}
private ResponseBodyProcessor createBodyProcessor() { private ResponseBodyProcessor createBodyProcessor() {
if (this.responseChannel == null) { if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel(); this.responseChannel = this.exchange.getResponseChannel();
} }
ResponseBodyProcessor bodyProcessor = ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor( this.responseChannel);
new ResponseBodyProcessor( this.responseChannel);
bodyProcessor.registerListener(); bodyProcessor.registerListener();
return bodyProcessor; return bodyProcessor;
} }
@Override
protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() {
return new ResponseBodyFlushProcessor();
}
private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
...@@ -146,11 +145,13 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -146,11 +145,13 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
private volatile ByteBuffer byteBuffer; private volatile ByteBuffer byteBuffer;
public ResponseBodyProcessor(StreamSinkChannel responseChannel) { public ResponseBodyProcessor(StreamSinkChannel responseChannel) {
Assert.notNull(responseChannel, "'responseChannel' must not be null"); Assert.notNull(responseChannel, "'responseChannel' must not be null");
this.responseChannel = responseChannel; this.responseChannel = responseChannel;
} }
public void registerListener() { public void registerListener() {
this.responseChannel.getWriteSetter().set(this.listener); this.responseChannel.getWriteSetter().set(this.listener);
this.responseChannel.resumeWrites(); this.responseChannel.resumeWrites();
...@@ -202,9 +203,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -202,9 +203,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
public void handleEvent(StreamSinkChannel channel) { public void handleEvent(StreamSinkChannel channel) {
onWritePossible(); onWritePossible();
} }
} }
} }
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
...@@ -223,6 +222,6 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon ...@@ -223,6 +222,6 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
UndertowServerHttpResponse.this.responseChannel.flush(); UndertowServerHttpResponse.this.responseChannel.flush();
} }
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册