提交 342299ab 编写于 作者: A Arjen Poutsma

Initial commit

上级
target
.project
.classpath
.settings
*.iml
/.idea/
bin
.gradle
apply plugin: 'java'
repositories {
mavenCentral()
}
dependencies {
compile "org.springframework:spring-core:4.1.2.RELEASE"
compile "org.reactivestreams:reactive-streams:1.0.0.RC3"
compile "io.netty:netty-buffer:4.0.25.Final"
compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2"
testCompile "junit:junit:4.12"
}
version=1.0.0.BUILD-SNAPSHOT
package org.springframework.rx.io;/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.reactivestreams.Publisher;
import org.springframework.rx.util.BlockingByteBufQueue;
import org.springframework.rx.util.BlockingByteBufQueueSubscriber;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class ByteBufPublisherInputStream extends InputStream {
private final BlockingByteBufQueue queue;
private ByteBufInputStream currentStream;
public ByteBufPublisherInputStream(Publisher<ByteBuf> publisher) {
Assert.notNull(publisher, "'publisher' must not be null");
this.queue = new BlockingByteBufQueue();
publisher.subscribe(new BlockingByteBufQueueSubscriber(this.queue));
}
ByteBufPublisherInputStream(BlockingByteBufQueue queue) {
Assert.notNull(queue, "'queue' must not be null");
this.queue = queue;
}
@Override
public int available() throws IOException {
InputStream is = currentStream();
return is != null ? is.available() : 0;
}
@Override
public int read() throws IOException {
InputStream is = currentStream();
while (is != null) {
int ch = is.read();
if (ch != -1) {
return ch;
} else {
is = currentStream();
}
}
return -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
InputStream is = currentStream();
if (is == null) {
return -1;
}
else if (b == null) {
throw new NullPointerException();
}
else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
}
else if (len == 0) {
return 0;
}
do {
int n = is.read(b, off, len);
if (n > 0) {
return n;
}
else {
is = currentStream();
}
}
while (is != null);
return -1;
}
private InputStream currentStream() throws IOException {
try {
if (this.currentStream != null && this.currentStream.available() > 0) {
return this.currentStream;
}
else if (this.queue.isComplete()) {
return null;
}
else if (this.queue.isHeadBuffer()) {
ByteBuf current = this.queue.pollBuffer();
this.currentStream = new ByteBufInputStream(current);
return this.currentStream;
}
else if (this.queue.isHeadError()) {
Throwable t = this.queue.pollError();
throw toIOException(t);
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
return null;
}
private static IOException toIOException(Throwable t) {
return t instanceof IOException ? (IOException) t : new IOException(t);
}
}
package org.springframework.rx.io;
import java.io.IOException;
import java.io.OutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.rx.util.BlockingByteBufQueue;
import org.springframework.rx.util.BlockingByteBufQueuePublisher;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class ByteBufPublisherOutputStream extends OutputStream {
private final BlockingByteBufQueue queue = new BlockingByteBufQueue();
private final ByteBufAllocator bufferAllocator;
public ByteBufPublisherOutputStream() {
this(new UnpooledByteBufAllocator(false));
}
public ByteBufPublisherOutputStream(ByteBufAllocator bufferAllocator) {
Assert.notNull(bufferAllocator, "'bufferAllocator' must not be null");
this.bufferAllocator = bufferAllocator;
}
public Publisher<ByteBuf> toByteBufPublisher() {
return new BlockingByteBufQueuePublisher(this.queue);
}
@Override
public void write(int b) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(1, 1);
buffer.writeByte(b);
putBuffer(buffer);
}
@Override
public void write(byte[] b) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(b.length, b.length);
buffer.writeBytes(b);
putBuffer(buffer);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
ByteBuf buffer = this.bufferAllocator.buffer(len, len);
buffer.writeBytes(b, off, len);
putBuffer(buffer);
}
private void putBuffer(ByteBuf buffer) {
try {
this.queue.putBuffer(buffer);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() throws IOException {
try {
this.queue.complete();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import io.netty.buffer.ByteBuf;
import org.springframework.util.Assert;
/**
* A {@link BlockingQueue} aimed at working with {@code Publisher<ByteBuf>} instances.
* Mainly meant to bridge between reactive and non-reactive APIs, such as blocking
* streams.
*
* <p>Typically, this class will be used by two threads: one thread to put new elements on
* the stack by calling {@link #putBuffer(ByteBuf)}, possibly {@link #putError(Throwable)}
* and finally {@link #complete()}. The other thread will read elements by calling {@link
* #isHeadBuffer()} and {@link #isHeadError()}, while keeping an eye on {@link
* #isComplete()}.
*
* @author Arjen Poutsma
*/
public class BlockingByteBufQueue {
private final BlockingQueue<Element> queue = new LinkedBlockingQueue<Element>();
/**
* Inserts the specified buffer into this queue, waiting if necessary for space to
* become available.
* @param buffer the buffer to add
*/
public void putBuffer(ByteBuf buffer) throws InterruptedException {
Assert.notNull(buffer, "'buffer' must not be null");
Assert.state(!isComplete(), "Cannot put buffers in queue after complete()");
this.queue.put(new ByteBufElement(buffer));
}
/**
* Inserts the specified error into this queue, waiting if necessary for space to
* become available.
* @param error the error to add
*/
public void putError(Throwable error) throws InterruptedException {
Assert.notNull(error, "'error' must not be null");
Assert.state(!isComplete(), "Cannot put errors in queue after complete()");
this.queue.put(new ErrorElement(error));
}
/**
* Marks the queue as complete.
*/
public void complete() throws InterruptedException {
this.queue.put(COMPLETE);
}
/**
* Indicates whether the current head of this queue is a {@link ByteBuf}.
* @return {@code true} if the current head is a buffer; {@code false} otherwise
*/
public boolean isHeadBuffer() {
Element element = this.queue.peek();
return element instanceof ByteBufElement;
}
/**
* Indicates whether the current head of this queue is a {@link Throwable}.
* @return {@code true} if the current head is an error; {@code false} otherwise
*/
public boolean isHeadError() {
Element element = this.queue.peek();
return element instanceof ErrorElement;
}
/**
* Indicates whether there are more buffers or errors in this queue.
* @return {@code true} if there more elements in this queue; {@code false} otherwise
*/
public boolean isComplete() {
Element element = this.queue.peek();
return COMPLETE == element;
}
/**
* Retrieves and removes the buffer head of this queue. Should only be called after
* {@link #isHeadBuffer()} returns {@code true}.
* @return the head of the queue, as buffer
* @throws IllegalStateException if the current head of this queue is not a buffer
* @see #isHeadBuffer()
*/
public ByteBuf pollBuffer() throws InterruptedException {
Element element = this.queue.take();
return element != null ? element.getBuffer() : null;
}
/**
* Retrieves and removes the buffer error of this queue. Should only be called after
* {@link #isHeadError()} returns {@code true}.
* @return the head of the queue, as error
* @throws IllegalStateException if the current head of this queue is not a error
* @see #isHeadError()
*/
public Throwable pollError() throws InterruptedException {
Element element = this.queue.take();
return element != null ? element.getError() : null;
}
/**
* Removes all of the elements from this collection
*/
public void clear() {
this.queue.clear();
}
private interface Element {
ByteBuf getBuffer();
Throwable getError();
}
private static class ByteBufElement implements Element {
private final ByteBuf buffer;
public ByteBufElement(ByteBuf buffer) {
if (buffer == null) {
throw new IllegalArgumentException("'buffer' should not be null");
}
this.buffer = buffer;
}
@Override
public ByteBuf getBuffer() {
return this.buffer;
}
@Override
public Throwable getError() {
throw new IllegalStateException("No error on top of the queue");
}
}
private static class ErrorElement implements Element {
private final Throwable error;
public ErrorElement(Throwable error) {
if (error == null) {
throw new IllegalArgumentException("'error' should not be null");
}
this.error = error;
}
@Override
public ByteBuf getBuffer() {
throw new IllegalStateException("No ByteBuf on top of the queue");
}
@Override
public Throwable getError() {
return this.error;
}
}
private static final Element COMPLETE = new Element() {
@Override
public ByteBuf getBuffer() {
throw new IllegalStateException("No ByteBuf on top of the queue");
}
@Override
public Throwable getError() {
throw new IllegalStateException("No error on top of the queue");
}
};
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import io.netty.buffer.ByteBuf;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class BlockingByteBufQueuePublisher implements Publisher<ByteBuf> {
private final BlockingByteBufQueue queue;
private Subscriber<? super ByteBuf> subscriber;
private final Object subscriberMutex = new Object();
public BlockingByteBufQueuePublisher(BlockingByteBufQueue queue) {
Assert.notNull(queue, "'queue' must not be null");
this.queue = queue;
}
@Override
public void subscribe(Subscriber<? super ByteBuf> subscriber) {
synchronized (this.subscriberMutex) {
if (this.subscriber != null) {
subscriber.onError(
new IllegalStateException("Only one subscriber allowed"));
}
else {
this.subscriber = subscriber;
final SubscriptionThread thread = new SubscriptionThread();
this.subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
thread.request(n);
}
@Override
public void cancel() {
thread.cancel();
}
});
thread.start();
}
}
}
private class SubscriptionThread extends Thread {
private volatile long requestCount = 0;
private long l = 0;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted())
if ((l < requestCount || requestCount == Long.MAX_VALUE) &&
queue.isHeadBuffer()) {
subscriber.onNext(queue.pollBuffer());
l++;
}
else if (queue.isHeadError()) {
subscriber.onError(queue.pollError());
break;
}
else if (queue.isComplete()) {
subscriber.onComplete();
break;
}
}
catch (InterruptedException ex) {
// Allow thread to exit
}
}
public void request(long n) {
if (n != Long.MAX_VALUE) {
this.requestCount += n;
}
else {
this.requestCount = Long.MAX_VALUE;
}
}
public void cancel() {
interrupt();
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import io.netty.buffer.ByteBuf;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class BlockingByteBufQueueSubscriber implements Subscriber<ByteBuf> {
private final BlockingByteBufQueue queue;
private Subscription subscription;
public BlockingByteBufQueueSubscriber(BlockingByteBufQueue queue) {
Assert.notNull(queue, "'queue' must not be null");
this.queue = queue;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(ByteBuf byteBuf) {
try {
this.queue.putBuffer(byteBuf);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
this.subscription.request(1);
}
@Override
public void onError(Throwable t) {
try {
this.queue.putError(t);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
this.subscription.request(1);
}
@Override
public void onComplete() {
try {
this.queue.complete();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.io;
import java.io.EOFException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Before;
import org.junit.Test;
import org.springframework.rx.util.BlockingByteBufQueue;
import org.springframework.rx.util.BlockingByteBufQueuePublisher;
import static org.junit.Assert.*;
/**
* @author Arjen Poutsma
*/
public class ByteBufPublisherInputStreamTests {
private BlockingByteBufQueue queue;
private ByteBufPublisherInputStream is;
@Before
public void setUp() throws Exception {
queue = new BlockingByteBufQueue();
is = new ByteBufPublisherInputStream(queue);
}
@Test
public void readSingleByte() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete();
int ch = is.read();
assertEquals('a', ch);
ch = is.read();
assertEquals('b', ch);
ch = is.read();
assertEquals('c', ch);
ch = is.read();
assertEquals('d', ch);
ch = is.read();
assertEquals('e', ch);
ch = is.read();
assertEquals('f', ch);
ch = is.read();
assertEquals(-1, ch);
}
@Test
public void readBytes() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete();
byte[] buf = new byte[2];
int read = this.is.read(buf);
assertEquals(2, read);
assertArrayEquals(new byte[] { 'a', 'b'}, buf);
read = this.is.read(buf);
assertEquals(1, read);
assertEquals('c', buf[0]);
read = this.is.read(buf);
assertEquals(2, read);
assertArrayEquals(new byte[] { 'd', 'e'}, buf);
read = this.is.read(buf);
assertEquals(1, read);
assertEquals('f', buf[0]);
read = this.is.read(buf);
assertEquals(-1, read);
}
}
\ No newline at end of file
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author Arjen Poutsma
*/
public class BlockingByteBufQueuePublisherTests {
private BlockingByteBufQueue queue;
private BlockingByteBufQueuePublisher publisher;
@Before
public void setUp() throws Exception {
queue = new BlockingByteBufQueue();
publisher = new BlockingByteBufQueuePublisher(queue);
}
@Test
public void normal() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete();
final AtomicBoolean complete = new AtomicBoolean(false);
final List<ByteBuf> received = new ArrayList<ByteBuf>(2);
publisher.subscribe(new Subscriber<ByteBuf>() {
@Override
public void onSubscribe(Subscription s) {
s.request(2);
}
@Override
public void onNext(ByteBuf byteBuf) {
received.add(byteBuf);
}
@Override
public void onError(Throwable t) {
fail("onError not expected");
}
@Override
public void onComplete() {
complete.set(true);
}
});
while (!complete.get()) {
}
assertEquals(2, received.size());
assertSame(abc, received.get(0));
assertSame(def, received.get(1));
}
@Test
public void unbounded() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete();
final AtomicBoolean complete = new AtomicBoolean(false);
final List<ByteBuf> received = new ArrayList<ByteBuf>(2);
publisher.subscribe(new Subscriber<ByteBuf>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuf byteBuf) {
received.add(byteBuf);
}
@Override
public void onError(Throwable t) {
fail("onError not expected");
}
@Override
public void onComplete() {
complete.set(true);
}
});
while (!complete.get()) {
}
assertEquals(2, received.size());
assertSame(abc, received.get(0));
assertSame(def, received.get(1));
}
@Test
public void multipleSubscribe() throws Exception {
publisher.subscribe(new Subscriber<ByteBuf>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(ByteBuf byteBuf) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
publisher.subscribe(new Subscriber<ByteBuf>() {
@Override
public void onSubscribe(Subscription s) {
fail("onSubscribe not expected");
}
@Override
public void onNext(ByteBuf byteBuf) {
fail("onNext not expected");
}
@Override
public void onError(Throwable t) {
assertTrue(t instanceof IllegalStateException);
}
@Override
public void onComplete() {
fail("onComplete not expected");
}
});
}
}
\ No newline at end of file
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
/**
* @author Arjen Poutsma
*/
public class BlockingByteBufQueueTests {
private BlockingByteBufQueue queue;
@Before
public void setUp() throws Exception {
queue = new BlockingByteBufQueue();
}
@Test
public void normal() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
ByteBuf def = Unpooled.copiedBuffer(new byte[]{'d', 'e', 'f'});
queue.putBuffer(abc);
queue.putBuffer(def);
queue.complete();
assertTrue(queue.isHeadBuffer());
assertFalse(queue.isHeadError());
assertSame(abc, queue.pollBuffer());
assertTrue(queue.isHeadBuffer());
assertFalse(queue.isHeadError());
assertSame(def, queue.pollBuffer());
assertTrue(queue.isComplete());
}
@Test
public void empty() throws Exception {
assertNull(queue.pollBuffer());
}
@Test
public void error() throws Exception {
ByteBuf abc = Unpooled.copiedBuffer(new byte[]{'a', 'b', 'c'});
Throwable error = new IllegalStateException();
queue.putBuffer(abc);
queue.putError(error);
queue.complete();
assertTrue(queue.isHeadBuffer());
assertFalse(queue.isHeadError());
assertSame(abc, queue.pollBuffer());
assertTrue(queue.isHeadError());
assertFalse(queue.isHeadBuffer());
assertSame(error, queue.pollError());
assertTrue(queue.isComplete());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册