提交 501961cf 编写于 作者: A Allen Wang

Merge pull request #132 from allenxwang/2.x-ssl

Fix issue #130 and JUnit/Javadoc improvements
......@@ -102,6 +102,7 @@ project(':ribbon-examples') {
project(':ribbon-test') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.rxjava:rxjava-core:[0.17,)'
compile 'com.sun.jersey:jersey-server:1.11'
compile 'javax.ws.rs:jsr311-api:1.1.1'
compile 'com.sun.jersey:jersey-core:1.11'
......
/*
*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.ribbon.testutils;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import rx.functions.Func0;
public class TestUtils {
public static void waitUntilTrueOrTimeout(int timeoutMilliseconds, final Func0<Boolean> func) {
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
final AtomicBoolean stopThread = new AtomicBoolean(false);
if (!func.call()) {
(new Thread() {
@Override
public void run() {
while (!stopThread.get()) {
if (func.call()) {
lock.lock();
try {
condition.signalAll();
} finally {
lock.unlock();
}
}
try {
Thread.sleep(20);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
lock.lock();
try {
condition.await(timeoutMilliseconds, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
stopThread.set(true);
}
}
assertTrue(func.call());
}
}
......@@ -17,6 +17,7 @@
*/
package com.netflix.client.netty.http;
import static com.netflix.ribbon.testutils.TestUtils.waitUntilTrueOrTimeout;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
......@@ -45,8 +46,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.BeforeClass;
import org.junit.Ignore;
......@@ -55,6 +54,7 @@ import org.junit.Test;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import com.google.common.collect.Lists;
......@@ -138,17 +138,20 @@ public class NettyClientTest {
@Test
public void testObservable() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
HttpClientListener listener = observableClient.getListener();
Thread.sleep(1000);
final HttpClientListener listener = observableClient.getListener();
assertEquals(1, listener.getPoolAcquires());
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolReleases());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
@Test
......@@ -160,11 +163,15 @@ public class NettyClientTest {
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
// need to sleep to wait until connection is released
Thread.sleep(1000);
HttpClientListener listener = observableClient.getListener();
final HttpClientListener listener = observableClient.getListener();
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolAcquires());
assertEquals(1, listener.getPoolReleases());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
......@@ -178,14 +185,17 @@ public class NettyClientTest {
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
response = observableClient.submit(request);
person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
HttpClientListener listener = observableClient.getListener();
final HttpClientListener listener = observableClient.getListener();
assertEquals(2, listener.getPoolAcquires());
assertEquals(2, listener.getPoolReleases());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 2;
}
});
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolReuse());
}
......@@ -307,10 +317,14 @@ public class NettyClientTest {
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
Thread.sleep(1000);
HttpClientListener listener = lbObservables.getListener();
final HttpClientListener listener = lbObservables.getListener();
assertEquals(1, listener.getConnectionCount());
assertEquals(1, listener.getPoolReleases());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 1;
}
});
}
......@@ -374,6 +388,16 @@ public class NettyClientTest {
assertEquals(2, observer.obj.age);
ServerStats stats = lbObservables.getServerStats(badServer);
server.shutdown();
final HttpClientListener listener = lbObservables.getListener();
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 5;
}
});
assertEquals(0, listener.getPoolReuse());
// two requests to bad server because retry same server is set to 1
assertEquals(4, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
......@@ -443,10 +467,14 @@ public class NettyClientTest {
assertEquals(1, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
Thread.sleep(1000);
HttpClientListener listener = lbObservables.getListener();
final HttpClientListener listener = lbObservables.getListener();
assertEquals(2, listener.getPoolAcquires());
assertEquals(2, listener.getPoolReleases());
waitUntilTrueOrTimeout(1000, new Func0<Boolean>() {
@Override
public Boolean call() {
return listener.getPoolReleases() == 2;
}
});
assertEquals(2, listener.getConnectionCount());
assertEquals(0, listener.getPoolReuse());
......@@ -747,5 +775,5 @@ public class NettyClientTest {
.build());
Person person = getPersonObservable(observableClient.submit(host, port, request)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
}
}
}
......@@ -21,14 +21,32 @@ import java.util.concurrent.Future;
import rx.Observable;
/**
* A decorated request object whose response content contains the execution meta data.
*
* @author Allen Wang
*
* @param <T>
*/
public interface RequestWithMetaData<T> {
/**
* Non blocking API that returns an {@link Observable} while the execution is started asynchronously.
* Subscribing to the returned {@link Observable} is guaranteed to get the complete sequence from
* the beginning, which might be replayed by the framework.
*/
Observable<RibbonResponse<Observable<T>>> observe();
/**
* Non blocking API that returns an Observable. The execution is not started until the returned Observable is subscribed to.
*/
Observable<RibbonResponse<Observable<T>>> toObservable();
/**
* Non blocking API that returns a {@link Future}, where its {@link Future#get()} method is blocking and returns a
* single (or last element if there is a sequence of objects from the execution) element
*/
Future<RibbonResponse<T>> queue();
/**
* Blocking API that returns a single (or last element if there is a sequence of objects from the execution) element
*/
RibbonResponse<T> execute();
}
......@@ -19,16 +19,41 @@ import java.util.concurrent.Future;
import rx.Observable;
/**
* Request that provides blocking and non-blocking APIs to fetch the content.
*
* @author Allen Wang
*
*/
public interface RibbonRequest<T> {
/**
* Blocking API that returns a single (or last element if there is a sequence of objects from the execution) element
*/
public T execute();
/**
* Non blocking API that returns a {@link Future}, where its {@link Future#get()} method is blocking and returns a
* single (or last element if there is a sequence of objects from the execution) element
*/
public Future<T> queue();
/**
* Non blocking API that returns an {@link Observable} while the execution is started asynchronously.
* Subscribing to the returned {@link Observable} is guaranteed to get the complete sequence from
* the beginning, which might be replayed by the framework. Use this API for "fire and forget".
*/
public Observable<T> observe();
/**
* Non blocking API that returns an Observable. The execution is not started until the returned Observable is subscribed to.
*/
public Observable<T> toObservable();
/**
* Create a decorated {@link RequestWithMetaData} where you can call its similar blocking or non blocking
* APIs to get {@link RibbonResponse}, which in turn contains returned object(s) and
* some meta data from Hystrix execution.
*/
public RequestWithMetaData<T> withMetadata();
}
......@@ -18,6 +18,14 @@ package com.netflix.ribbon;
import com.netflix.hystrix.HystrixExecutableInfo;
/**
* Response object from {@link RequestWithMetaData} that contains the content
* and the meta data from execution.
*
* @author Allen Wang
*
* @param <T> Data type of the returned object
*/
public abstract class RibbonResponse<T> {
public abstract T content();
......
......@@ -26,6 +26,7 @@ import rx.Observer;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
import com.netflix.hystrix.HystrixExecutableInfo;
......@@ -66,8 +67,9 @@ class HttpMetaRequest<T> implements RequestWithMetaData<T> {
@Override
public Observable<RibbonResponse<Observable<T>>> observe() {
RibbonHystrixObservableCommand<T> hystrixCommand = request.createHystrixCommand();
final Observable<T> output = hystrixCommand.observe();
return convertToRibbonResponse(output, hystrixCommand);
ReplaySubject<T> subject = ReplaySubject.create();
hystrixCommand.getObservable().subscribe(subject);
return convertToRibbonResponse(subject, hystrixCommand);
}
@Override
......
......@@ -20,18 +20,15 @@ import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.ReplaySubject;
import com.netflix.ribbon.CacheProvider;
import com.netflix.ribbon.RequestWithMetaData;
import com.netflix.ribbon.RibbonRequest;
import com.netflix.ribbon.http.HttpRequestTemplate.CacheProviderWithKeyTemplate;
import com.netflix.ribbon.template.TemplateParser;
import com.netflix.ribbon.template.TemplateParsingException;
......@@ -95,7 +92,9 @@ class HttpRequest<T> implements RibbonRequest<T> {
@Override
public Observable<T> observe() {
return createHystrixCommand().observe();
ReplaySubject<T> subject = ReplaySubject.create();
createHystrixCommand().getObservable().subscribe(subject);
return subject;
}
@Override
......
......@@ -310,6 +310,64 @@ public class RibbonTest {
assertEquals(content, result);
}
@Test
public void testObserve() throws IOException, InterruptedException {
MockWebServer server = new MockWebServer();
String content = "Hello world";
server.enqueue(new MockResponse().setResponseCode(200).setHeader("Content-type", "text/plain")
.setBody(content));
server.enqueue(new MockResponse().setResponseCode(200).setHeader("Content-type", "text/plain")
.setBody(content));
server.play();
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient",
ClientOptions.create()
.withMaxAutoRetriesNextServer(3)
.withReadTimeout(300000)
.withConfigurationBasedServerList("localhost:12345, localhost:10092, localhost:" + server.getPort()));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
RibbonRequest<ByteBuf> request = template
.withUriTemplate("/")
.withMethod("GET")
.requestBuilder().build();
Observable<ByteBuf> result = request.observe();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> fromCommand = new AtomicReference<String>();
// We need to wait until the response is received and processed by event loop
// and make sure that subscribing to it again will not cause ByteBuf ref count issue
result.toBlocking().last();
result.subscribe(new Action1<ByteBuf>() {
@Override
public void call(ByteBuf t1) {
try {
fromCommand.set(t1.toString(Charset.defaultCharset()));
} catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
});
latch.await();
assertEquals(content, fromCommand.get());
Observable<RibbonResponse<Observable<ByteBuf>>> metaResult = request.withMetadata().observe();
String result2 = "";
// We need to wait until the response is received and processed by event loop
// and make sure that subscribing to it again will not cause ByteBuf ref count issue
metaResult.toBlocking().last();
result2 = metaResult.flatMap(new Func1<RibbonResponse<Observable<ByteBuf>>, Observable<ByteBuf>>(){
@Override
public Observable<ByteBuf> call(
RibbonResponse<Observable<ByteBuf>> t1) {
return t1.content();
}
}).map(new Func1<ByteBuf, String>(){
@Override
public String call(ByteBuf t1) {
return t1.toString(Charset.defaultCharset());
}
}).toBlocking().single();
assertEquals(content, result2);
}
@Test
public void testCacheMiss() throws IOException, InterruptedException {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册