提交 c552aaa6 编写于 作者: A Arjen Poutsma

work, work.

上级 342299ab
buildscript {
repositories {
maven { url 'http://repo.springsource.org/plugins-release' }
}
dependencies {
classpath 'org.springframework.build.gradle:propdeps-plugin:0.0.7'
}
}
apply plugin: 'java' apply plugin: 'java'
apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: 'propdeps-maven'
repositories { repositories {
mavenCentral() mavenCentral()
...@@ -12,6 +24,8 @@ dependencies { ...@@ -12,6 +24,8 @@ dependencies {
compile "org.slf4j:slf4j-api:1.7.6" compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2" compile "ch.qos.logback:logback-classic:1.1.2"
provided "javax.servlet:javax.servlet-api:3.1.0"
testCompile "junit:junit:4.12" testCompile "junit:junit:4.12"
} }
......
...@@ -14,34 +14,40 @@ package org.springframework.rx.io;/* ...@@ -14,34 +14,40 @@ package org.springframework.rx.io;/*
* limitations under the License. * limitations under the License.
*/ */
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.springframework.rx.util.BlockingByteBufQueue; import org.springframework.rx.util.BlockingSignalQueue;
import org.springframework.rx.util.BlockingByteBufQueueSubscriber; import org.springframework.rx.util.BlockingSignalQueueSubscriber;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* {@code InputStream} implementation based on a byte array {@link Publisher}.
*
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class ByteBufPublisherInputStream extends InputStream { public class ByteArrayPublisherInputStream extends InputStream {
private final BlockingSignalQueue<byte[]> queue;
private final BlockingByteBufQueue queue; private ByteArrayInputStream currentStream;
private ByteBufInputStream currentStream;
public ByteBufPublisherInputStream(Publisher<ByteBuf> publisher) { /**
* Creates a new {@code ByteArrayPublisherInputStream} based on the given publisher.
* @param publisher the publisher to use
*/
public ByteArrayPublisherInputStream(Publisher<byte[]> publisher) {
Assert.notNull(publisher, "'publisher' must not be null"); Assert.notNull(publisher, "'publisher' must not be null");
this.queue = new BlockingByteBufQueue(); this.queue = new BlockingSignalQueue<byte[]>();
publisher.subscribe(new BlockingByteBufQueueSubscriber(this.queue)); publisher.subscribe(new BlockingSignalQueueSubscriber<byte[]>(this.queue));
} }
ByteBufPublisherInputStream(BlockingByteBufQueue queue) { ByteArrayPublisherInputStream(BlockingSignalQueue<byte[]> queue) {
Assert.notNull(queue, "'queue' must not be null"); Assert.notNull(queue, "'queue' must not be null");
this.queue = queue; this.queue = queue;
} }
...@@ -91,6 +97,7 @@ public class ByteBufPublisherInputStream extends InputStream { ...@@ -91,6 +97,7 @@ public class ByteBufPublisherInputStream extends InputStream {
} }
} }
while (is != null); while (is != null);
return -1; return -1;
} }
...@@ -102,14 +109,14 @@ public class ByteBufPublisherInputStream extends InputStream { ...@@ -102,14 +109,14 @@ public class ByteBufPublisherInputStream extends InputStream {
else if (this.queue.isComplete()) { else if (this.queue.isComplete()) {
return null; return null;
} }
else if (this.queue.isHeadBuffer()) { else if (this.queue.isHeadSignal()) {
ByteBuf current = this.queue.pollBuffer(); byte[] current = this.queue.pollSignal();
this.currentStream = new ByteBufInputStream(current); this.currentStream = new ByteArrayInputStream(current);
return this.currentStream; return this.currentStream;
} }
else if (this.queue.isHeadError()) { else if (this.queue.isHeadError()) {
Throwable t = this.queue.pollError(); Throwable t = this.queue.pollError();
throw toIOException(t); throw t instanceof IOException ? (IOException) t : new IOException(t);
} }
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
...@@ -118,8 +125,4 @@ public class ByteBufPublisherInputStream extends InputStream { ...@@ -118,8 +125,4 @@ public class ByteBufPublisherInputStream extends InputStream {
return null; return null;
} }
private static IOException toIOException(Throwable t) {
return t instanceof IOException ? (IOException) t : new IOException(t);
}
} }
package org.springframework.rx.io;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import org.reactivestreams.Publisher;
import org.springframework.rx.util.BlockingSignalQueue;
import org.springframework.rx.util.BlockingSignalQueuePublisher;
/**
* {@code OutputStream} implementation that stores all written bytes, to be retrieved
* using {@link #toByteBufPublisher()}.
* @author Arjen Poutsma
*/
public class ByteArrayPublisherOutputStream extends OutputStream {
private final BlockingSignalQueue<byte[]> queue = new BlockingSignalQueue<byte[]>();
/**
* Returns the written data as a {@code Publisher}.
* @return a publisher for the written bytes
*/
public Publisher<byte[]> toByteBufPublisher() {
return new BlockingSignalQueuePublisher<byte[]>(this.queue);
}
@Override
public void write(int b) throws IOException {
write(new byte[]{(byte) b});
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] copy = Arrays.copyOf(b, len);
try {
this.queue.putSignal(copy);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() throws IOException {
try {
this.queue.complete();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
package org.springframework.rx.io;
import java.io.IOException;
import java.io.OutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.rx.util.BlockingByteBufQueue;
import org.springframework.rx.util.BlockingByteBufQueuePublisher;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class ByteBufPublisherOutputStream extends OutputStream {
private final BlockingByteBufQueue queue = new BlockingByteBufQueue();
private final ByteBufAllocator bufferAllocator;
public ByteBufPublisherOutputStream() {
this(new UnpooledByteBufAllocator(false));
}
public ByteBufPublisherOutputStream(ByteBufAllocator bufferAllocator) {
Assert.notNull(bufferAllocator, "'bufferAllocator' must not be null");
this.bufferAllocator = bufferAllocator;
}
public Publisher<ByteBuf> toByteBufPublisher() {
return new BlockingByteBufQueuePublisher(this.queue);
}
@Override
public void write(int b) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(1, 1);
buffer.writeByte(b);
putBuffer(buffer);
}
@Override
public void write(byte[] b) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(b.length, b.length);
buffer.writeBytes(b);
putBuffer(buffer);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(len, len);
buffer.writeBytes(b, off, len);
putBuffer(buffer);
}
private void putBuffer(ByteBuf buffer) {
try {
this.queue.putBuffer(buffer);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() throws IOException {
try {
this.queue.complete();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public abstract class AbstractUnicastAsyncSubscriber<T> implements Subscriber<T> {
private final Executor executor;
private Subscription subscription;
private boolean done;
protected AbstractUnicastAsyncSubscriber(Executor executor) {
Assert.notNull(executor, "'executor' must not be null");
this.executor = executor;
}
private void done() {
done = true;
if (subscription != null) {
subscription.cancel();
}
}
// This method is invoked when the OnNext signals arrive
// Returns whether more elements are desired or not, and if no more elements are desired,
// for convenience.
protected abstract boolean whenNext(final T element);
// This method is invoked when the OnComplete signal arrives
// override this method to implement your own custom onComplete logic.
protected void whenComplete() {
}
// This method is invoked if the OnError signal arrives
// override this method to implement your own custom onError logic.
protected void whenError(Throwable error) {
}
private void handleOnSubscribe(Subscription subscription) {
if (subscription == null) {
return;
}
if (this.subscription != null) {
subscription.cancel();
}
else {
this.subscription = subscription;
this.subscription.request(1);
}
}
private void handleOnNext(final T element) {
if (!done) {
try {
if (whenNext(element)) {
subscription.request(1);
}
else {
done();
}
}
catch (final Throwable t) {
done();
onError(t);
}
}
}
private void handleOnComplete() {
done = true;
whenComplete();
}
private void handleOnError(final Throwable error) {
done = true;
whenError(error);
}
// We implement the OnX methods on `Subscriber` to send Signals that we will process asycnhronously, but only one at a time
@Override
public final void onSubscribe(final Subscription s) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Subscription` is `null`
if (s == null) {
throw null;
}
signal(new OnSubscribe(s));
}
@Override
public final void onNext(final T element) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `element` is `null`
if (element == null) {
throw null;
}
signal(new OnNext<T>(element));
}
@Override
public final void onError(final Throwable t) {
// As per rule 2.13, we need to throw a `java.lang.NullPointerException` if the `Throwable` is `null`
if (t == null) {
throw null;
}
signal(new OnError(t));
}
@Override
public final void onComplete() {
signal(OnComplete.INSTANCE);
}
private final ConcurrentLinkedQueue<Signal<T>> inboundSignals =
new ConcurrentLinkedQueue<Signal<T>>();
private final AtomicBoolean enabled = new AtomicBoolean(false);
// What `signal` does is that it sends signals to the `Subscription` asynchronously
private void signal(final Signal signal) {
if (inboundSignals
.offer(signal)) // No need to null-check here as ConcurrentLinkedQueue does this for us
{
tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already
}
}
// This method makes sure that this `Subscriber` is only executing on one Thread at a time
private void tryScheduleToExecute() {
if (enabled.compareAndSet(false, true)) {
try {
executor.execute(new SignalRunnable());
}
catch (Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully and not violate rule 2.13
if (!done) {
try {
done(); // First of all, this failure is not recoverable, so we need to cancel our subscription
}
finally {
inboundSignals.clear(); // We're not going to need these anymore
// This subscription is cancelled by now, but letting the Subscriber become schedulable again means
// that we can drain the inboundSignals queue if anything arrives after clearing
enabled.set(false);
}
}
}
}
}
private class SignalRunnable implements Runnable {
@Override
public void run() {
if (enabled.get()) {
try {
Signal<T> s = inboundSignals.poll();
if (!done) {
if (s.isOnNext()) {
handleOnNext(s.next());
}
else if (s.isOnSubscribe()) {
handleOnSubscribe(s.subscription());
}
else if (s.isOnError()) {
handleOnError(s.error());
}
else if (s.isComplete()) {
handleOnComplete();
}
}
}
finally {
enabled.set(false);
if (!inboundSignals.isEmpty()) {
tryScheduleToExecute();
}
}
}
}
}
}
\ No newline at end of file
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author Arjen Poutsma
*/
public abstract class AbstractUnicastSyncSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
private boolean done = false;
@Override
public final void onSubscribe(Subscription subscription) {
if (subscription == null) {
throw new NullPointerException();
}
if (this.subscription != null) {
subscription.cancel();
}
else {
this.subscription = subscription;
this.subscription.request(1);
}
}
@Override
public final void onNext(T element) {
if (element == null) {
throw new NullPointerException();
}
if (!done) {
try {
if (onNextInternal(element)) {
subscription.request(1);
}
else {
done();
}
}
catch (Throwable t) {
done();
onError(t);
}
}
}
private void done() {
done = true;
subscription.cancel();
}
protected abstract boolean onNextInternal(final T element) throws Exception;
}
...@@ -29,26 +29,26 @@ import org.springframework.util.Assert; ...@@ -29,26 +29,26 @@ import org.springframework.util.Assert;
* streams. * streams.
* *
* <p>Typically, this class will be used by two threads: one thread to put new elements on * <p>Typically, this class will be used by two threads: one thread to put new elements on
* the stack by calling {@link #putBuffer(ByteBuf)}, possibly {@link #putError(Throwable)} * the stack by calling {@link #put(ByteBuf)}, possibly {@link #putError(Throwable)} and
* and finally {@link #complete()}. The other thread will read elements by calling {@link * finally {@link #complete()}. The other thread will read elements by calling {@link
* #isHeadBuffer()} and {@link #isHeadError()}, while keeping an eye on {@link * #isHeadSignal()}/{@link #pollSignal()} and {@link #isHeadError()}/{@link #pollError()},
* #isComplete()}. * while keeping an eye on {@link #isComplete()}.
*
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class BlockingByteBufQueue { public class BlockingSignalQueue<T> {
private final BlockingQueue<Signal<T>> queue = new LinkedBlockingQueue<Signal<T>>();
private final BlockingQueue<Element> queue = new LinkedBlockingQueue<Element>();
/** /**
* Inserts the specified buffer into this queue, waiting if necessary for space to * Inserts the specified signal into this queue, waiting if necessary for space to
* become available. * become available.
* @param buffer the buffer to add * @param t the signal to add
*/ */
public void putBuffer(ByteBuf buffer) throws InterruptedException { public void putSignal(T t) throws InterruptedException {
Assert.notNull(buffer, "'buffer' must not be null"); Assert.notNull(t, "'t' must not be null");
Assert.state(!isComplete(), "Cannot put buffers in queue after complete()"); Assert.state(!isComplete(), "Cannot put signal in queue after complete()");
this.queue.put(new ByteBufElement(buffer)); this.queue.put(new OnNext(t));
} }
/** /**
...@@ -58,24 +58,24 @@ public class BlockingByteBufQueue { ...@@ -58,24 +58,24 @@ public class BlockingByteBufQueue {
*/ */
public void putError(Throwable error) throws InterruptedException { public void putError(Throwable error) throws InterruptedException {
Assert.notNull(error, "'error' must not be null"); Assert.notNull(error, "'error' must not be null");
Assert.state(!isComplete(), "Cannot put errors in queue after complete()"); Assert.state(!isComplete(), "Cannot putSignal errors in queue after complete()");
this.queue.put(new ErrorElement(error)); this.queue.put(new OnError(error));
} }
/** /**
* Marks the queue as complete. * Marks the queue as complete.
*/ */
public void complete() throws InterruptedException { public void complete() throws InterruptedException {
this.queue.put(COMPLETE); this.queue.put(OnComplete.INSTANCE);
} }
/** /**
* Indicates whether the current head of this queue is a {@link ByteBuf}. * Indicates whether the current head of this queue is a signal.
* @return {@code true} if the current head is a buffer; {@code false} otherwise * @return {@code true} if the current head is a signal; {@code false} otherwise
*/ */
public boolean isHeadBuffer() { public boolean isHeadSignal() {
Element element = this.queue.peek(); Signal signal = this.queue.peek();
return element instanceof ByteBufElement; return signal instanceof OnNext;
} }
/** /**
...@@ -83,8 +83,8 @@ public class BlockingByteBufQueue { ...@@ -83,8 +83,8 @@ public class BlockingByteBufQueue {
* @return {@code true} if the current head is an error; {@code false} otherwise * @return {@code true} if the current head is an error; {@code false} otherwise
*/ */
public boolean isHeadError() { public boolean isHeadError() {
Element element = this.queue.peek(); Signal signal = this.queue.peek();
return element instanceof ErrorElement; return signal instanceof OnError;
} }
/** /**
...@@ -92,20 +92,20 @@ public class BlockingByteBufQueue { ...@@ -92,20 +92,20 @@ public class BlockingByteBufQueue {
* @return {@code true} if there more elements in this queue; {@code false} otherwise * @return {@code true} if there more elements in this queue; {@code false} otherwise
*/ */
public boolean isComplete() { public boolean isComplete() {
Element element = this.queue.peek(); Signal signal = this.queue.peek();
return COMPLETE == element; return OnComplete.INSTANCE == signal;
} }
/** /**
* Retrieves and removes the buffer head of this queue. Should only be called after * Retrieves and removes the signal head of this queue. Should only be called after
* {@link #isHeadBuffer()} returns {@code true}. * {@link #isHeadSignal()} returns {@code true}.
* @return the head of the queue, as buffer * @return the head of the queue
* @throws IllegalStateException if the current head of this queue is not a buffer * @throws IllegalStateException if the current head of this queue is not a buffer
* @see #isHeadBuffer() * @see #isHeadSignal()
*/ */
public ByteBuf pollBuffer() throws InterruptedException { public T pollSignal() throws InterruptedException {
Element element = this.queue.take(); Signal<T> signal = this.queue.take();
return element != null ? element.getBuffer() : null; return signal != null ? signal.next() : null;
} }
/** /**
...@@ -116,78 +116,8 @@ public class BlockingByteBufQueue { ...@@ -116,78 +116,8 @@ public class BlockingByteBufQueue {
* @see #isHeadError() * @see #isHeadError()
*/ */
public Throwable pollError() throws InterruptedException { public Throwable pollError() throws InterruptedException {
Element element = this.queue.take(); Signal signal = this.queue.take();
return element != null ? element.getError() : null; return signal != null ? signal.error() : null;
}
/**
* Removes all of the elements from this collection
*/
public void clear() {
this.queue.clear();
}
private interface Element {
ByteBuf getBuffer();
Throwable getError();
}
private static class ByteBufElement implements Element {
private final ByteBuf buffer;
public ByteBufElement(ByteBuf buffer) {
if (buffer == null) {
throw new IllegalArgumentException("'buffer' should not be null");
}
this.buffer = buffer;
}
@Override
public ByteBuf getBuffer() {
return this.buffer;
}
@Override
public Throwable getError() {
throw new IllegalStateException("No error on top of the queue");
}
}
private static class ErrorElement implements Element {
private final Throwable error;
public ErrorElement(Throwable error) {
if (error == null) {
throw new IllegalArgumentException("'error' should not be null");
}
this.error = error;
}
@Override
public ByteBuf getBuffer() {
throw new IllegalStateException("No ByteBuf on top of the queue");
}
@Override
public Throwable getError() {
return this.error;
}
} }
private static final Element COMPLETE = new Element() {
@Override
public ByteBuf getBuffer() {
throw new IllegalStateException("No ByteBuf on top of the queue");
}
@Override
public Throwable getError() {
throw new IllegalStateException("No error on top of the queue");
}
};
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package org.springframework.rx.util; package org.springframework.rx.util;
import io.netty.buffer.ByteBuf;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
...@@ -26,21 +25,21 @@ import org.springframework.util.Assert; ...@@ -26,21 +25,21 @@ import org.springframework.util.Assert;
/** /**
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> { public class BlockingSignalQueuePublisher<T> implements Publisher<T> {
private final BlockingByteBufQueue queue; private final BlockingSignalQueue<T> queue;
private Subscriber<? super ByteBuf> subscriber; private Subscriber<? super T> subscriber;
private final Object subscriberMutex = new Object(); private final Object subscriberMutex = new Object();
public BlockingByteBufQueuePublisher(BlockingByteBufQueue queue) { public BlockingSignalQueuePublisher(BlockingSignalQueue<T> queue) {
Assert.notNull(queue, "'queue' must not be null"); Assert.notNull(queue, "'queue' must not be null");
this.queue = queue; this.queue = queue;
} }
@Override @Override
public void subscribe(Subscriber<? super ByteBuf> subscriber) { public void subscribe(Subscriber<? super T> subscriber) {
synchronized (this.subscriberMutex) { synchronized (this.subscriberMutex) {
if (this.subscriber != null) { if (this.subscriber != null) {
subscriber.onError( subscriber.onError(
...@@ -74,10 +73,10 @@ public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> { ...@@ -74,10 +73,10 @@ public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> {
@Override @Override
public void run() { public void run() {
try { try {
while (!Thread.currentThread().isInterrupted()) while (!Thread.currentThread().isInterrupted()) {
if ((l < requestCount || requestCount == Long.MAX_VALUE) && if ((l < requestCount || requestCount == Long.MAX_VALUE) &&
queue.isHeadBuffer()) { queue.isHeadSignal()) {
subscriber.onNext(queue.pollBuffer()); subscriber.onNext(queue.pollSignal());
l++; l++;
} }
else if (queue.isHeadError()) { else if (queue.isHeadError()) {
...@@ -88,6 +87,7 @@ public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> { ...@@ -88,6 +87,7 @@ public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> {
subscriber.onComplete(); subscriber.onComplete();
break; break;
} }
}
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
// Allow thread to exit // Allow thread to exit
......
...@@ -16,42 +16,78 @@ ...@@ -16,42 +16,78 @@
package org.springframework.rx.util; package org.springframework.rx.util;
import io.netty.buffer.ByteBuf;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* A simple byte array {@link Subscriber} that puts all published bytes on a
* {@link @BlockingSignalQueue}.
*
* @author Arjen Poutsma * @author Arjen Poutsma
*/ */
public class BlockingByteBufQueueSubscriber implements Subscriber<ByteBuf> { public class BlockingSignalQueueSubscriber<T> implements Subscriber<T> {
/**
* The default request size to use.
*/
public static final int DEFAULT_REQUEST_SIZE = 1;
private final BlockingByteBufQueue queue; private final BlockingSignalQueue<T> queue;
private Subscription subscription; private Subscription subscription;
public BlockingByteBufQueueSubscriber(BlockingByteBufQueue queue) { private int initialRequestSize = DEFAULT_REQUEST_SIZE;
private int requestSize = DEFAULT_REQUEST_SIZE;
/**
* Creates a new {@code BlockingSignalQueueSubscriber} using the given queue.
* @param queue the queue to use
*/
public BlockingSignalQueueSubscriber(BlockingSignalQueue<T> queue) {
Assert.notNull(queue, "'queue' must not be null"); Assert.notNull(queue, "'queue' must not be null");
this.queue = queue; this.queue = queue;
} }
/**
* Sets the request size used when subscribing, in {@link #onSubscribe(Subscription)}.
* Defaults to {@link #DEFAULT_REQUEST_SIZE}.
* @param initialRequestSize the initial request size
* @see Subscription#request(long)
*/
public void setInitialRequestSize(int initialRequestSize) {
this.initialRequestSize = initialRequestSize;
}
/**
* Sets the request size used after data or an error comes in, in {@link
* #onNext(Object)} and {@link #onError(Throwable)}. Defaults to {@link
* #DEFAULT_REQUEST_SIZE}.
* @see Subscription#request(long)
*/
public void setRequestSize(int requestSize) {
this.requestSize = requestSize;
}
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
this.subscription = subscription; this.subscription = subscription;
this.subscription.request(1); this.subscription.request(this.initialRequestSize);
} }
@Override @Override
public void onNext(ByteBuf byteBuf) { public void onNext(T t) {
try { try {
this.queue.putBuffer(byteBuf); this.queue.putSignal(t);
} }
catch (InterruptedException ex) { catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
this.subscription.request(1); this.subscription.request(requestSize);
} }
@Override @Override
...@@ -62,7 +98,7 @@ public class BlockingByteBufQueueSubscriber implements Subscriber<ByteBuf> { ...@@ -62,7 +98,7 @@ public class BlockingByteBufQueueSubscriber implements Subscriber<ByteBuf> {
catch (InterruptedException ex) { catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
this.subscription.request(1); this.subscription.request(requestSize);
} }
@Override @Override
......
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscription;
/**
* @author Arjen Poutsma
*/
class OnComplete implements Signal {
public static final OnComplete INSTANCE = new OnComplete();
private OnComplete() {
}
@Override
public boolean isComplete() {
return true;
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public Object next() {
throw new IllegalStateException();
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
@Override
public boolean isOnSubscribe() {
return false;
}
@Override
public Subscription subscription() {
throw new IllegalStateException();
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
final class OnError implements Signal {
private final Throwable error;
public OnError(Throwable error) {
Assert.notNull(error, "'error' must not be null");
this.error = error;
}
@Override
public boolean isOnError() {
return true;
}
@Override
public Throwable error() {
return error;
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public Object next() {
throw new IllegalStateException();
}
@Override
public boolean isOnSubscribe() {
return false;
}
@Override
public Subscription subscription() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
class OnNext<T> implements Signal<T> {
private final T next;
public OnNext(T next) {
Assert.notNull(next, "'next' must not be null");
this.next = next;
}
@Override
public boolean isOnNext() {
return true;
}
@Override
public T next() {
return next;
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
@Override
public boolean isOnSubscribe() {
return false;
}
@Override
public Subscription subscription() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
class OnSubscribe implements Signal {
private final Subscription subscription;
public OnSubscribe(Subscription subscription) {
Assert.notNull(subscription, "'subscription' must not be null");
this.subscription = subscription;
}
@Override
public boolean isOnSubscribe() {
return true;
}
@Override
public Subscription subscription() {
return null;
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public Object next() {
throw new IllegalStateException();
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.reactivestreams.Subscription;
/**
* @author Arjen Poutsma
*/
interface Signal<T> {
boolean isOnNext();
T next();
boolean isOnError();
Throwable error();
boolean isOnSubscribe();
Subscription subscription();
boolean isComplete();
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
/**
* @author Arjen Poutsma
*/
class AsyncContextSynchronizer {
private static final int READ_COMPLETE = 1;
private static final int WRITE_COMPLETE = 1 << 1;
private static final int COMPLETE = READ_COMPLETE | WRITE_COMPLETE;
private final AsyncContext asyncContext;
private final AtomicInteger complete = new AtomicInteger(0);
public AsyncContextSynchronizer(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}
public ServletInputStream getInputStream() throws IOException {
return this.asyncContext.getRequest().getInputStream();
}
public ServletOutputStream getOutputStream() throws IOException {
return this.asyncContext.getResponse().getOutputStream();
}
public void readComplete() {
if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) {
this.asyncContext.complete();
}
else {
this.complete.compareAndSet(0, READ_COMPLETE);
}
}
public void writeComplete() {
if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) {
this.asyncContext.complete();
}
else {
this.complete.compareAndSet(0, WRITE_COMPLETE);
}
}
}
...@@ -16,15 +16,10 @@ ...@@ -16,15 +16,10 @@
package org.springframework.rx.io; package org.springframework.rx.io;
import java.io.EOFException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.rx.util.BlockingByteBufQueue; import org.springframework.rx.util.BlockingSignalQueue;
import org.springframework.rx.util.BlockingByteBufQueuePublisher;
import static org.junit.Assert.*; import static org.junit.Assert.*;
...@@ -33,25 +28,22 @@ import static org.junit.Assert.*; ...@@ -33,25 +28,22 @@ import static org.junit.Assert.*;
*/ */
public class ByteBufPublisherInputStreamTests { public class ByteBufPublisherInputStreamTests {
private BlockingByteBufQueue queue; private BlockingSignalQueue<byte[]> queue;
private ByteBufPublisherInputStream is; private ByteArrayPublisherInputStream is;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
queue = new BlockingByteBufQueue(); queue = new BlockingSignalQueue<byte[]>();
is = new ByteBufPublisherInputStream(queue); is = new ByteArrayPublisherInputStream(queue);
} }
@Test @Test
public void readSingleByte() throws Exception { public void readSingleByte() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); queue.putSignal(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); queue.putSignal(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete(); queue.complete();
...@@ -75,11 +67,8 @@ public class ByteBufPublisherInputStreamTests { ...@@ -75,11 +67,8 @@ public class ByteBufPublisherInputStreamTests {
@Test @Test
public void readBytes() throws Exception { public void readBytes() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); queue.putSignal(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); queue.putSignal(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete(); queue.complete();
byte[] buf = new byte[2]; byte[] buf = new byte[2];
......
...@@ -33,14 +33,14 @@ import org.reactivestreams.Subscription; ...@@ -33,14 +33,14 @@ import org.reactivestreams.Subscription;
*/ */
public class BlockingByteBufQueuePublisherTests { public class BlockingByteBufQueuePublisherTests {
private BlockingByteBufQueue queue; private BlockingSignalQueue queue;
private BlockingByteBufQueuePublisher publisher; private BlockingSignalQueuePublisher publisher;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
queue = new BlockingByteBufQueue(); queue = new BlockingSignalQueue();
publisher = new BlockingByteBufQueuePublisher(queue); publisher = new BlockingSignalQueuePublisher(queue);
} }
@Test @Test
...@@ -48,8 +48,8 @@ public class BlockingByteBufQueuePublisherTests { ...@@ -48,8 +48,8 @@ public class BlockingByteBufQueuePublisherTests {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc); queue.putSignal(abc);
queue.putBuffer(def); queue.putSignal(def);
queue.complete(); queue.complete();
final AtomicBoolean complete = new AtomicBoolean(false); final AtomicBoolean complete = new AtomicBoolean(false);
...@@ -90,8 +90,8 @@ public class BlockingByteBufQueuePublisherTests { ...@@ -90,8 +90,8 @@ public class BlockingByteBufQueuePublisherTests {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc); queue.putSignal(abc);
queue.putBuffer(def); queue.putSignal(def);
queue.complete(); queue.complete();
final AtomicBoolean complete = new AtomicBoolean(false); final AtomicBoolean complete = new AtomicBoolean(false);
......
...@@ -27,11 +27,11 @@ import org.junit.Test; ...@@ -27,11 +27,11 @@ import org.junit.Test;
*/ */
public class BlockingByteBufQueueTests { public class BlockingByteBufQueueTests {
private BlockingByteBufQueue queue; private BlockingSignalQueue queue;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
queue = new BlockingByteBufQueue(); queue = new BlockingSignalQueue();
} }
@Test @Test
...@@ -39,41 +39,37 @@ public class BlockingByteBufQueueTests { ...@@ -39,41 +39,37 @@ public class BlockingByteBufQueueTests {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'}); ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc); queue.putSignal(abc);
queue.putBuffer(def); queue.putSignal(def);
queue.complete(); queue.complete();
assertTrue(queue.isHeadBuffer()); assertTrue(queue.isHeadSignal());
assertFalse(queue.isHeadError()); assertFalse(queue.isHeadError());
assertSame(abc, queue.pollBuffer()); assertSame(abc, queue.pollSignal());
assertTrue(queue.isHeadBuffer()); assertTrue(queue.isHeadSignal());
assertFalse(queue.isHeadError()); assertFalse(queue.isHeadError());
assertSame(def, queue.pollBuffer()); assertSame(def, queue.pollSignal());
assertTrue(queue.isComplete()); assertTrue(queue.isComplete());
} }
@Test
public void empty() throws Exception {
assertNull(queue.pollBuffer());
}
@Test @Test
public void error() throws Exception { public void error() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'}); ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
Throwable error = new IllegalStateException(); Throwable error = new IllegalStateException();
queue.putBuffer(abc); queue.putSignal(abc);
queue.putError(error); queue.putError(error);
queue.complete(); queue.complete();
assertTrue(queue.isHeadBuffer()); assertTrue(queue.isHeadSignal());
assertFalse(queue.isHeadError()); assertFalse(queue.isHeadError());
assertSame(abc, queue.pollBuffer()); assertSame(abc, queue.pollSignal());
assertTrue(queue.isHeadError()); assertTrue(queue.isHeadError());
assertFalse(queue.isHeadBuffer()); assertFalse(queue.isHeadSignal());
assertSame(error, queue.pollError()); assertSame(error, queue.pollError());
assertTrue(queue.isComplete()); assertTrue(queue.isComplete());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册