提交 f518d76a 编写于 作者: A Arjen Poutsma

Working version of Servlet 3.1 <-> RS bridge.

上级 3b45087c
......@@ -27,6 +27,15 @@ dependencies {
provided "javax.servlet:javax.servlet-api:3.1.0"
testCompile "junit:junit:4.12"
testCompile "org.springframework:spring-web:4.1.2.RELEASE"
testCompile 'org.apache.tomcat:tomcat-util:8.0.23'
testCompile 'org.apache.tomcat.embed:tomcat-embed-core:8.0.23'
testCompile 'org.eclipse.jetty:jetty-server:9.3.0.v20150612'
testCompile 'org.eclipse.jetty:jetty-servlet:9.3.0.v20150612'
testCompile("log4j:log4j:1.2.16")
}
......@@ -21,7 +21,6 @@ import java.io.InputStream;
import org.reactivestreams.Publisher;
import org.springframework.rx.util.BlockingSignalQueue;
import org.springframework.rx.util.BlockingSignalQueueSubscriber;
import org.springframework.util.Assert;
/**
......@@ -44,7 +43,7 @@ public class ByteArrayPublisherInputStream extends InputStream {
Assert.notNull(publisher, "'publisher' must not be null");
this.queue = new BlockingSignalQueue<byte[]>();
publisher.subscribe(new BlockingSignalQueueSubscriber<byte[]>(this.queue));
publisher.subscribe(this.queue.subscriber());
}
ByteArrayPublisherInputStream(BlockingSignalQueue<byte[]> queue) {
......
......@@ -7,7 +7,6 @@ import java.util.Arrays;
import org.reactivestreams.Publisher;
import org.springframework.rx.util.BlockingSignalQueue;
import org.springframework.rx.util.BlockingSignalQueuePublisher;
/**
* {@code OutputStream} implementation that stores all written bytes, to be retrieved
......@@ -23,7 +22,7 @@ public class ByteArrayPublisherOutputStream extends OutputStream {
* @return a publisher for the written bytes
*/
public Publisher<byte[]> toByteBufPublisher() {
return new BlockingSignalQueuePublisher<byte[]>(this.queue);
return this.queue.publisher();
}
@Override
......
......@@ -19,6 +19,10 @@ package org.springframework.rx.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
......@@ -35,6 +39,8 @@ import org.springframework.util.Assert;
*/
public class BlockingSignalQueue<T> {
private static final int DEFAULT_REQUEST_SIZE_SUBSCRIBER = 1;
private final BlockingQueue<Signal<T>> queue = new LinkedBlockingQueue<Signal<T>>();
......@@ -119,121 +125,151 @@ public class BlockingSignalQueue<T> {
return signal != null ? signal.error() : null;
}
private interface Signal<T> {
boolean isOnNext();
T next();
boolean isOnError();
Throwable error();
boolean isComplete();
/**
* Returns a {@code Publisher} backed by this queue.
*/
public Publisher<T> publisher() {
return new BlockingSignalQueuePublisher();
}
private static class OnNext<T> implements Signal<T> {
private final T next;
public OnNext(T next) {
Assert.notNull(next, "'next' must not be null");
this.next = next;
}
@Override
public boolean isOnNext() {
return true;
}
@Override
public T next() {
return next;
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
/**
* Returns a {@code Subscriber} backed by this queue.
*/
public Subscriber<T> subscriber() {
return subscriber(DEFAULT_REQUEST_SIZE_SUBSCRIBER);
}
private static final class OnError<T> implements Signal<T> {
/**
* Returns a {@code Subscriber} backed by this queue, with the given request size.
* @see Subscription#request(long)
*/
public Subscriber<T> subscriber(long requestSize) {
return new BlockingSignalQueueSubscriber(requestSize);
}
private final Throwable error;
private class BlockingSignalQueuePublisher implements Publisher<T> {
public OnError(Throwable error) {
Assert.notNull(error, "'error' must not be null");
this.error = error;
}
private Subscriber<? super T> subscriber;
@Override
public boolean isOnError() {
return true;
}
private final Object subscriberMutex = new Object();
@Override
public Throwable error() {
return error;
public void subscribe(Subscriber<? super T> subscriber) {
synchronized (this.subscriberMutex) {
if (this.subscriber != null) {
subscriber.onError(
new IllegalStateException("Only one subscriber allowed"));
}
else {
this.subscriber = subscriber;
final SubscriptionThread thread = new SubscriptionThread();
this.subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
thread.request(n);
}
@Override
public void cancel() {
thread.cancel();
}
});
thread.start();
}
}
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public T next() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
private class SubscriptionThread extends Thread {
private volatile long demand = 0;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
if (demand > 0 && isHeadSignal()) {
subscriber.onNext(pollSignal());
if (demand != Long.MAX_VALUE) {
demand--;
}
}
else if (isHeadError()) {
subscriber.onError(pollError());
break;
}
else if (isComplete()) {
subscriber.onComplete();
break;
}
}
}
catch (InterruptedException ex) {
// Allow thread to exit
}
}
public void request(long n) {
if (n != Long.MAX_VALUE) {
this.demand += n;
}
else {
this.demand = Long.MAX_VALUE;
}
}
public void cancel() {
interrupt();
}
}
}
private static class OnComplete<T> implements Signal<T> {
private class BlockingSignalQueueSubscriber implements Subscriber<T> {
private static final OnComplete INSTANCE = new OnComplete();
private final long requestSize;
private OnComplete() {
}
private Subscription subscription;
@Override
public boolean isComplete() {
return true;
public BlockingSignalQueueSubscriber(long requestSize) {
this.requestSize = requestSize;
}
@Override
public boolean isOnNext() {
return false;
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(this.requestSize);
}
@Override
public T next() {
throw new IllegalStateException();
public void onNext(T t) {
try {
putSignal(t);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
this.subscription.request(requestSize);
}
@Override
public boolean isOnError() {
return false;
public void onError(Throwable t) {
try {
putError(t);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
this.subscription.request(requestSize);
}
@Override
public Throwable error() {
throw new IllegalStateException();
public void onComplete() {
try {
complete();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
/**
* @author Arjen Poutsma
*/
class OnComplete<T> implements Signal<T> {
public static final OnComplete INSTANCE = new OnComplete();
private OnComplete() {
}
@Override
public boolean isComplete() {
return true;
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public T next() {
throw new IllegalStateException();
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
final class OnError<T> implements Signal<T> {
private final Throwable error;
public OnError(Throwable error) {
Assert.notNull(error, "'error' must not be null");
this.error = error;
}
@Override
public boolean isOnError() {
return true;
}
@Override
public Throwable error() {
return error;
}
@Override
public boolean isOnNext() {
return false;
}
@Override
public T next() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
class OnNext<T> implements Signal<T> {
private final T next;
public OnNext(T next) {
Assert.notNull(next, "'next' must not be null");
this.next = next;
}
@Override
public boolean isOnNext() {
return true;
}
@Override
public T next() {
return next;
}
@Override
public boolean isOnError() {
return false;
}
@Override
public Throwable error() {
throw new IllegalStateException();
}
@Override
public boolean isComplete() {
return false;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.util;
/**
* @author Arjen Poutsma
*/
interface Signal<T> {
boolean isOnNext();
T next();
boolean isOnError();
Throwable error();
boolean isComplete();
}
......@@ -22,11 +22,16 @@ import javax.servlet.AsyncContext;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Arjen Poutsma
*/
class AsyncContextSynchronizer {
private static final Log logger = LogFactory.getLog(AsyncContextSynchronizer.class);
private static final int READ_COMPLETE = 1;
private static final int WRITE_COMPLETE = 1 << 1;
......@@ -50,7 +55,9 @@ class AsyncContextSynchronizer {
}
public void readComplete() {
logger.debug("Read complete");
if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) {
logger.debug("Complete");
this.asyncContext.complete();
}
else {
......@@ -59,7 +66,9 @@ class AsyncContextSynchronizer {
}
public void writeComplete() {
logger.debug("Write complete");
if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) {
logger.debug("Complete");
this.asyncContext.complete();
}
else {
......
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import org.reactivestreams.Publisher;
/**
* @author Arjen Poutsma
*/
public interface HttpHandler {
Publisher<byte[]> handle(Publisher<byte[]> request);
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import java.io.IOException;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Publisher;
/**
* @author Arjen Poutsma
*/
@WebServlet(asyncSupported = true )
public class HttpHandlerServlet extends HttpServlet {
private static final int BUFFER_SIZE = 4096;
private HttpHandler handler;
public void setHandler(HttpHandler handler) {
this.handler = handler;
}
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
AsyncContext context = request.startAsync();
final AsyncContextSynchronizer contextSynchronizer =
new AsyncContextSynchronizer(context);
RequestBodyPublisher requestPublisher = new RequestBodyPublisher(contextSynchronizer, BUFFER_SIZE);
request.getInputStream().setReadListener(requestPublisher);
ResponseBodySubscriber responseSubscriber = new ResponseBodySubscriber(contextSynchronizer);
response.getOutputStream().setWriteListener(responseSubscriber);
Publisher<byte[]> responsePublisher = this.handler.handle(requestPublisher);
responsePublisher.subscribe(responseSubscriber);
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author Arjen Poutsma
*/
public class RequestBodyPublisher implements ReadListener, Publisher<byte[]> {
private final Charset UTF_8 = Charset.forName("UTF-8");
private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class);
private final AsyncContextSynchronizer synchronizer;
private final byte[] buffer;
private long demand;
private Subscriber<? super byte[]> subscriber;
public RequestBodyPublisher(AsyncContextSynchronizer synchronizer, int bufferSize) {
this.synchronizer = synchronizer;
this.buffer = new byte[bufferSize];
}
@Override
public void subscribe(Subscriber<? super byte[]> s) {
this.subscriber = s;
this.subscriber.onSubscribe(new RequestBodySubscription());
}
@Override
public void onDataAvailable() throws IOException {
ServletInputStream input = this.synchronizer.getInputStream();
while (true) {
logger.debug("Demand: " + this.demand);
if (demand <= 0) {
break;
}
boolean ready = input.isReady();
logger.debug("Input " + ready + "/" + input.isFinished());
if (!ready) {
break;
}
int read = input.read(buffer);
logger.debug("Input read:" + read);
if (read == -1) {
break;
}
else if (read > 0) {
if (demand != Long.MAX_VALUE) {
demand--;
}
byte[] copy = Arrays.copyOf(this.buffer, read);
// logger.debug("Next: " + new String(copy, UTF_8));
this.subscriber.onNext(copy);
}
}
}
@Override
public void onAllDataRead() throws IOException {
logger.debug("All data read");
this.synchronizer.readComplete();
this.subscriber.onComplete();
}
@Override
public void onError(Throwable t) {
logger.error("RequestBodyPublisher Error", t);
this.subscriber.onError(t);
}
private class RequestBodySubscription implements Subscription {
@Override
public void request(long n) {
logger.debug("Updating demand " + demand + " by " + n);
boolean stalled = demand <= 0;
if (n != Long.MAX_VALUE && demand != Long.MAX_VALUE) {
demand += n;
}
else {
demand = Long.MAX_VALUE;
}
if (stalled) {
try {
onDataAvailable();
}
catch (IOException ex) {
onError(ex);
}
}
}
@Override
public void cancel() {
synchronizer.readComplete();
demand = 0;
}
}
}
......@@ -14,100 +14,98 @@
* limitations under the License.
*/
package org.springframework.rx.util;
package org.springframework.rx.web.servlet;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* A simple byte array {@link Subscriber} that puts all published bytes on a
* {@link @BlockingSignalQueue}.
*
* @author Arjen Poutsma
*/
public class BlockingSignalQueueSubscriber<T> implements Subscriber<T> {
public class ResponseBodySubscriber implements WriteListener, Subscriber<byte[]> {
/**
* The default request size to use.
*/
public static final int DEFAULT_REQUEST_SIZE = 1;
private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class);
private final BlockingSignalQueue<T> queue;
private final AsyncContextSynchronizer synchronizer;
private Subscription subscription;
private int initialRequestSize = DEFAULT_REQUEST_SIZE;
private int requestSize = DEFAULT_REQUEST_SIZE;
/**
* Creates a new {@code BlockingSignalQueueSubscriber} using the given queue.
* @param queue the queue to use
*/
public BlockingSignalQueueSubscriber(BlockingSignalQueue<T> queue) {
Assert.notNull(queue, "'queue' must not be null");
this.queue = queue;
}
private byte[] buffer;
/**
* Sets the request size used when subscribing, in {@link #onSubscribe(Subscription)}.
* Defaults to {@link #DEFAULT_REQUEST_SIZE}.
* @param initialRequestSize the initial request size
* @see Subscription#request(long)
*/
public void setInitialRequestSize(int initialRequestSize) {
this.initialRequestSize = initialRequestSize;
}
private AtomicBoolean complete = new AtomicBoolean(false);
/**
* Sets the request size used after data or an error comes in, in {@link
* #onNext(Object)} and {@link #onError(Throwable)}. Defaults to {@link
* #DEFAULT_REQUEST_SIZE}.
* @see Subscription#request(long)
*/
public void setRequestSize(int requestSize) {
this.requestSize = requestSize;
public ResponseBodySubscriber(AsyncContextSynchronizer synchronizer) {
this.synchronizer = synchronizer;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(this.initialRequestSize);
this.subscription.request(1);
}
@Override
public void onNext(T t) {
public void onNext(byte[] bytes) {
logger.debug("Next: " + bytes.length + " bytes");
Assert.isNull(buffer);
this.buffer = bytes;
try {
this.queue.putSignal(t);
onWritePossible();
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
catch (IOException e) {
onError(e);
}
this.subscription.request(requestSize);
}
@Override
public void onError(Throwable t) {
try {
this.queue.putError(t);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
public void onComplete() {
logger.debug("Complete buffer: " + (buffer == null));
if (complete.compareAndSet(false, true) && buffer == null) {
this.synchronizer.writeComplete();
}
this.subscription.request(requestSize);
}
@Override
public void onComplete() {
try {
this.queue.complete();
public void onWritePossible() throws IOException {
ServletOutputStream output = this.synchronizer.getOutputStream();
boolean ready = output.isReady();
logger.debug("Output: " + ready + " buffer: " + (buffer == null));
if (this.buffer != null && ready) {
output.write(this.buffer);
this.buffer = null;
if (!complete.get()) {
this.subscription.request(1);
}
else {
this.synchronizer.writeComplete();
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
else if (this.buffer == null && ready) {
this.subscription.request(1);
}
}
@Override
public void onError(Throwable t) {
logger.error("ResponseBodySubscriber error", t);
}
private void complete() {
}
}
log4j.rootCategory=INFO, stdout
log4j.logger.org.springframework.rx=DEBUG
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%25.25c{1}] <%t> - %m%n
\ No newline at end of file
......@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
......@@ -34,12 +35,12 @@ public class BlockingByteBufQueuePublisherTests {
private BlockingSignalQueue<byte[]> queue;
private BlockingSignalQueuePublisher<byte[]> publisher;
private Publisher<byte[]> publisher;
@Before
public void setUp() throws Exception {
queue = new BlockingSignalQueue<byte[]>();
publisher = new BlockingSignalQueuePublisher<byte[]>(queue);
publisher = queue.publisher();
}
@Test
......
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import java.net.URI;
import java.util.Random;
import org.junit.Test;
import org.springframework.http.HttpMethod;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.SocketUtils;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
public abstract class AbstractHttpHandlerServletIntegrationTestCase {
private static final int REQUEST_SIZE = 4096 * 3;
protected static int port = SocketUtils.findAvailableTcpPort();
private Random rnd = new Random();
@Test
public void bytes() throws Exception {
RestTemplate restTemplate = new RestTemplate();
byte[] body = randomBytes();
RequestEntity<byte[]>
request = new RequestEntity<byte[]>(body, HttpMethod.POST, new URI(url()));
ResponseEntity<byte[]> response = restTemplate.exchange(request, byte[].class);
assertArrayEquals(body, response.getBody());
}
@Test
public void string() throws Exception {
RestTemplate restTemplate = new RestTemplate();
String body = randomString();
RequestEntity<String> request = new RequestEntity<String>(body, HttpMethod.POST, new URI(url()));
ResponseEntity<String> response = restTemplate.exchange(request, String.class);
assertEquals(body, response.getBody());
}
private static String url() {
return "http://localhost:" + port + "/rx";
}
private String randomString() {
StringBuilder builder = new StringBuilder();
int i = 1;
while (builder.length() < REQUEST_SIZE) {
builder.append(randomChar());
if (i % 5 == 0) {
builder.append(' ');
}
if (i % 80 == 0) {
builder.append('\n');
}
i++;
}
return builder.toString();
}
private char randomChar() {
return (char) (rnd.nextInt(26) + 'a');
}
private byte[] randomBytes() {
byte[] buffer = new byte[REQUEST_SIZE];
rnd.nextBytes(buffer);
return buffer;
}
}
......@@ -14,97 +14,51 @@
* limitations under the License.
*/
package org.springframework.rx.util;
package org.springframework.rx.web.servlet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
/**
* @author Arjen Poutsma
*/
public class BlockingSignalQueuePublisher<T> implements Publisher<T> {
private final BlockingSignalQueue<T> queue;
public class CountingHttpHandler implements HttpHandler {
private Subscriber<? super T> subscriber;
private static final Log logger = LogFactory.getLog(CountingHttpHandler.class);
private final Object subscriberMutex = new Object();
@Override
public Publisher<byte[]> handle(Publisher<byte[]> request) {
request.subscribe(new Subscriber<byte[]>() {
private Subscription subscription;
public BlockingSignalQueuePublisher(BlockingSignalQueue<T> queue) {
Assert.notNull(queue, "'queue' must not be null");
this.queue = queue;
}
private int byteCount = 0;
@Override
public void subscribe(Subscriber<? super T> subscriber) {
synchronized (this.subscriberMutex) {
if (this.subscriber != null) {
subscriber.onError(
new IllegalStateException("Only one subscriber allowed"));
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
this.subscription.request(1);
}
else {
this.subscriber = subscriber;
final SubscriptionThread thread = new SubscriptionThread();
this.subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
thread.request(n);
}
@Override
public void cancel() {
thread.cancel();
}
});
thread.start();
@Override
public void onNext(byte[] bytes) {
byteCount += bytes.length;
this.subscription.request(1);
}
}
}
private class SubscriptionThread extends Thread {
private volatile long requestCount = 0;
private long l = 0;
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
if ((l < requestCount || requestCount == Long.MAX_VALUE) &&
queue.isHeadSignal()) {
subscriber.onNext(queue.pollSignal());
l++;
}
else if (queue.isHeadError()) {
subscriber.onError(queue.pollError());
break;
}
else if (queue.isComplete()) {
subscriber.onComplete();
break;
}
}
@Override
public void onError(Throwable t) {
logger.error("CountingHttpHandler Error", t);
t.printStackTrace();
}
catch (InterruptedException ex) {
// Allow thread to exit
}
}
public void request(long n) {
if (n != Long.MAX_VALUE) {
this.requestCount += n;
}
else {
this.requestCount = Long.MAX_VALUE;
@Override
public void onComplete() {
logger.info("Processed " + byteCount + " bytes");
}
}
public void cancel() {
interrupt();
}
});
return null;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import org.reactivestreams.Publisher;
/**
* @author Arjen Poutsma
*/
public class EchoHandler implements HttpHandler {
@Override
public Publisher<byte[]> handle(Publisher<byte[]> request) {
return request;
}
}
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.springframework.util.SocketUtils;
/**
* @author Arjen Poutsma
*/
public class HttpHandlerServletJettyIntegrationTests
extends AbstractHttpHandlerServletIntegrationTestCase {
private static Server jettyServer;
@BeforeClass
public static void startServer() throws Exception {
jettyServer = new Server();
ServerConnector connector = new ServerConnector(jettyServer);
port = SocketUtils.findAvailableTcpPort();
connector.setPort(port);
ServletContextHandler handler = new ServletContextHandler(jettyServer, "", false, false);
HttpHandlerServlet servlet = new HttpHandlerServlet();
servlet.setHandler(new EchoHandler());
ServletHolder servletHolder = new ServletHolder(servlet);
handler.addServlet(servletHolder, "/rx");
jettyServer.addConnector(connector);
jettyServer.start();
}
@AfterClass
public static void stopServer() throws Exception {
jettyServer.stop();
}
}
\ No newline at end of file
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.rx.web.servlet;
import java.io.File;
import org.apache.catalina.Context;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.startup.Tomcat;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* @author Arjen Poutsma
*/
public class HttpHandlerServletTomcatIntegrationTests extends AbstractHttpHandlerServletIntegrationTestCase {
private static Tomcat tomcatServer;
@BeforeClass
public static void startServer() throws LifecycleException, InterruptedException {
tomcatServer = new Tomcat();
tomcatServer.setPort(port);
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootCtx = tomcatServer.addContext("", base.getAbsolutePath());
HttpHandlerServlet servlet = new HttpHandlerServlet();
servlet.setHandler(new EchoHandler());
tomcatServer.addServlet(rootCtx, "handlerServlet", servlet);
rootCtx.addServletMapping("/rx", "handlerServlet");
tomcatServer.start();
}
@AfterClass
public static void stopServer() throws LifecycleException {
tomcatServer.stop();
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册