提交 9216170a 编写于 作者: A Allen Wang

Changed method signature of callback interface. Make sure Future returned from async call work.

上级 c248403d
......@@ -38,12 +38,12 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
asyncExecuteOnSingleServer(resolved, new ResponseCallback<Response>() {
@Override
public void onResponseReceived(Response response) {
callback.onResponseReceived(response);
public void completed(Response response) {
callback.completed(response);
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
boolean shouldRetry = false;
if (e instanceof ClientException) {
// we dont want to retry for PUT/POST and DELETE, we can for GET
......@@ -51,7 +51,7 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
}
if (shouldRetry) {
if (retries.incrementAndGet() > numRetriesNextServer) {
callback.onException(new ClientException(
callback.failed(new ClientException(
ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"NUMBER_OF_RETRIES_NEXTSERVER_EXCEEDED :"
+ numRetriesNextServer
......@@ -66,19 +66,23 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
try {
asyncExecuteOnSingleServer(computeFinalUriWithLoadBalancer(request), this);
} catch (ClientException e1) {
callback.onException(e1);
callback.failed(e1);
}
} else {
if (e instanceof ClientException) {
callback.onException(e);
callback.failed(e);
} else {
callback.onException(new ClientException(
callback.failed(new ClientException(
ClientException.ErrorType.GENERAL,
"Unable to execute request for URI:" + request.getUri(),
e));
}
}
}
@Override
public void cancelled() {
}
});
return null;
......@@ -104,14 +108,14 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
private Response thisResponse;
private Throwable thisException;
@Override
public void onResponseReceived(Response response) {
public void completed(Response response) {
thisResponse = response;
onComplete();
callback.onResponseReceived(response);
callback.completed(response);
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
thisException = e;
onComplete();
if (serverStats != null) {
......@@ -123,7 +127,7 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
boolean shouldRetry = retryOkayOnOperation && numRetries > 0 && isRetriableException(e);
if (shouldRetry) {
if (!handleSameServerRetry(uri, retries.incrementAndGet(), numRetries, e)) {
callback.onException(new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
callback.failed(new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"NUMBEROFRETRIESEXEEDED :" + numRetries + " retries, while making a RestClient call for: " + uri, e));
} else {
tracer.start();
......@@ -131,12 +135,12 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
try {
asyncClient.execute(request, this);
} catch (ClientException ex) {
callback.onException(ex);
callback.failed(ex);
}
}
} else {
ClientException clientException = generateNIWSException(uri.toString(), e);
callback.onException(clientException);
callback.failed(clientException);
}
}
......@@ -144,6 +148,10 @@ public class AsyncLoadBalancingClient<Request extends ClientRequest, Response ex
tracer.stop();
long duration = tracer.getDuration(TimeUnit.MILLISECONDS);
noteRequestCompletion(serverStats, request, thisResponse, thisException, duration);
}
@Override
public void cancelled() {
}
});
}
......
......@@ -3,16 +3,5 @@ package com.netflix.client;
import java.util.concurrent.Future;
public interface AsyncStreamClient<T extends ClientRequest, S extends IResponse, U> {
public interface StreamCallback<S, E> {
public void onResponseReceived(S response);
public void onError(Throwable e);
public void onCompleted();
public void onElement(E element);
}
public <E> Future<S> stream(T request, StreamDecoder<E, U> decooder, StreamCallback<S, E> callback) throws Exception;
public <E> Future<S> stream(T request, StreamDecoder<E, U> decooder, StreamResponseCallback<S, E> callback) throws Exception;
}
......@@ -27,15 +27,20 @@ public class ObservableAsyncClient<T extends ClientRequest, S extends ResponseWi
new ResponseCallback<S>() {
@Override
public void onResponseReceived(S response) {
public void completed(S response) {
observer.onNext(response);
observer.onCompleted();
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
observer.onError(e);
}
@Override
public void cancelled() {
observer.onError(new IllegalStateException("operation cancelled"));
}
})));
} catch (ClientException e) {
throw new RuntimeException(e);
......
package com.netflix.client;
public interface ResponseCallback<R extends ResponseWithTypedEntity> {
public void onResponseReceived(R response);
public interface ResponseCallback<T extends IResponse> {
public void completed(T response);
public void onException(Throwable e);
public void failed(Throwable e);
public void cancelled();
}
package com.netflix.client;
import java.io.IOException;
import java.util.List;
public interface StreamDecoder<E, T> {
List<E> decode(T input) throws IOException;
public interface StreamDecoder<T, S> {
T decode(S input) throws IOException;
}
package com.netflix.client;
public interface StreamResponseCallback<T extends IResponse, S> extends ResponseCallback<T> {
public void onResponseReceived(T response);
public void onContentReceived(S content);
}
package com.netflix.client;
public interface TypedEntityResponseCallback extends ResponseCallback{
}
package com.netflix.httpasyncclient;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.netflix.client.ClientException;
import com.netflix.client.IResponse;
public class BaseResponse implements IResponse {
protected HttpResponse response;
public BaseResponse(HttpResponse response) {
this.response = response;
}
@Override
public Object getPayload() throws ClientException {
return response.getEntity();
}
@Override
public boolean hasPayload() {
return response.getEntity() != null;
}
@Override
public boolean isSuccess() {
return response.getStatusLine().getStatusCode() == 200;
}
@Override
public URI getRequestedURI() {
return null;
}
@Override
public Map<String, Collection<String>> getHeaders() {
Multimap<String, String> map = ArrayListMultimap.create();
for (Header header: response.getAllHeaders()) {
map.put(header.getName(), header.getValue());
}
return map.asMap();
}
public int getStatus() {
return response.getStatusLine().getStatusCode();
}
public boolean hasEntity() {
return hasPayload();
}
}
......@@ -6,6 +6,7 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
......@@ -39,6 +40,7 @@ import com.netflix.client.ClientException;
import com.netflix.client.ResponseCallback;
import com.netflix.client.ResponseWithTypedEntity;
import com.netflix.client.StreamDecoder;
import com.netflix.client.StreamResponseCallback;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.http.HttpRequest;
......@@ -56,25 +58,18 @@ public class RibbonHttpAsyncClient
private SerializationFactory<ContentTypeBasedSerializerKey> factory = new JacksonSerializationFactory();
private static Logger logger = LoggerFactory.getLogger(RibbonHttpAsyncClient.class);
public static class AsyncResponse implements ResponseWithTypedEntity {
public static class AsyncResponse extends BaseResponse implements ResponseWithTypedEntity {
private HttpResponse response;
private SerializationFactory<ContentTypeBasedSerializerKey> factory;
public AsyncResponse(HttpResponse response, SerializationFactory<ContentTypeBasedSerializerKey> serializationFactory) {
super(response);
this.response = response;
this.factory = serializationFactory;
}
@Override
public Object getPayload() throws ClientException {
return response.getEntity();
}
@Override
public boolean hasPayload() {
// return decoder != null && !decoder.isCompleted();
HttpEntity entity = response.getEntity();
try {
return (entity != null && entity.getContent() != null && entity.getContent().available() > 0);
......@@ -83,26 +78,6 @@ public class RibbonHttpAsyncClient
}
}
@Override
public boolean isSuccess() {
return response.getStatusLine().getStatusCode() == 200;
}
@Override
public URI getRequestedURI() {
return null;
}
@Override
public Map<String, Collection<String>> getHeaders() {
Multimap<String, String> map = ArrayListMultimap.create();
for (Header header: response.getAllHeaders()) {
map.put(header.getName(), header.getValue());
}
return map.asMap();
}
@Override
public <T> T get(Class<T> type) throws ClientException {
ContentTypeBasedSerializerKey key = new ContentTypeBasedSerializerKey(response.getFirstHeader("Content-type").getValue(), type);
......@@ -126,10 +101,7 @@ public class RibbonHttpAsyncClient
}
public int getStatus() {
return response.getStatusLine().getStatusCode();
}
@Override
public boolean hasEntity() {
return hasPayload();
}
......@@ -145,6 +117,17 @@ public class RibbonHttpAsyncClient
}
}
public void releaseResources() {
HttpEntity entity = response.getEntity();
if (entity != null) {
try {
entity.getContent().close();
} catch (IllegalStateException e) {
} catch (IOException e) {
}
}
}
}
public RibbonHttpAsyncClient() {
......@@ -189,7 +172,7 @@ public class RibbonHttpAsyncClient
}
private Future<AsyncResponse> fromHttpResponseFuture(final Future<HttpResponse> future) {
private Future<AsyncResponse> createFuture(final Future<HttpResponse> future, final DelegateCallback callback) {
return new Future<AsyncResponse>() {
@Override
public boolean cancel(boolean arg0) {
......@@ -199,14 +182,14 @@ public class RibbonHttpAsyncClient
@Override
public AsyncResponse get() throws InterruptedException,
ExecutionException {
return new AsyncResponse(future.get(), factory);
return callback.getCompletedResponse();
}
@Override
public AsyncResponse get(long arg0, TimeUnit arg1)
public AsyncResponse get(long time, TimeUnit timeUnit)
throws InterruptedException, ExecutionException,
TimeoutException {
return new AsyncResponse(future.get(arg0, arg1), factory);
return callback.getCompletedResponse(time, timeUnit);
}
@Override
......@@ -216,7 +199,7 @@ public class RibbonHttpAsyncClient
@Override
public boolean isDone() {
return future.isDone();
return callback.isDone();
}
};
}
......@@ -230,7 +213,7 @@ public class RibbonHttpAsyncClient
// MyResponseConsumer consumer = new MyResponseConsumer(callback);
// logger.info("start execute");
Future<HttpResponse> future = httpclient.execute(request, fCallback);
return fromHttpResponseFuture(future);
return createFuture(future, fCallback);
}
private HttpUriRequest getRequest(HttpRequest ribbonRequest) throws ClientException {
......@@ -283,20 +266,35 @@ public class RibbonHttpAsyncClient
public DelegateCallback(ResponseCallback<AsyncResponse> callback) {
this.callback = callback;
}
private CountDownLatch latch = new CountDownLatch(1);
private volatile AsyncResponse completeResponse = null;
AsyncResponse getCompletedResponse() throws InterruptedException {
latch.await();
return completeResponse;
}
AsyncResponse getCompletedResponse(long time, TimeUnit timeUnit) throws InterruptedException {
latch.await(time, timeUnit);
return completeResponse;
}
boolean isDone() {
return latch.getCount() <= 0;
}
@Override
public void completed(HttpResponse result) {
if (callbackInvoked.compareAndSet(false, true)) {
try {
callback.onResponseReceived(new AsyncResponse(result, factory));
} catch (Throwable e) {
e.printStackTrace();
logger.error("Error invoking callback");
} finally {
try {
result.getEntity().getContent().close();
} catch (Exception e) {
}
completeResponse = new AsyncResponse(result, factory);
latch.countDown();
if (callback != null) {
try {
callback.completed(completeResponse);
} catch (Throwable e) {
logger.error("Error invoking callback", e);
}
}
}
}
......@@ -304,34 +302,36 @@ public class RibbonHttpAsyncClient
@Override
public void failed(Exception e) {
if (callbackInvoked.compareAndSet(false, true)) {
callback.onException(e);
latch.countDown();
if (callback != null) {
callback.failed(e);
}
}
}
@Override
public void cancelled() {
if (callbackInvoked.compareAndSet(false, true)) {
callback.onException(new ClientException("request has been cancelled"));
if (callbackInvoked.compareAndSet(false, true) && callback != null) {
callback.failed(new ClientException("request has been cancelled"));
}
}
}
@Override
public <E> Future<AsyncResponse> stream(
public <T> Future<AsyncResponse> stream(
HttpRequest ribbonRequest,
final StreamDecoder<E, ByteBuffer> decoder,
final StreamCallback<AsyncResponse, E> callback) throws ClientException {
final StreamDecoder<T, ByteBuffer> decoder,
final StreamResponseCallback<AsyncResponse, T> callback) throws ClientException {
HttpUriRequest request = getRequest(ribbonRequest);
final DelegateCallback internalCallback = new DelegateCallback(callback);
AsyncByteConsumer<HttpResponse> consumer = new AsyncByteConsumer<HttpResponse>() {
private volatile HttpResponse response;
@Override
protected void onByteReceived(ByteBuffer buf, IOControl ioctrl)
throws IOException {
List<E> elements = decoder.decode(buf);
if (elements != null) {
for (E e: elements) {
callback.onElement(e);
}
T obj = decoder.decode(buf);
if (obj != null) {
callback.onContentReceived(obj);
}
}
......@@ -349,24 +349,8 @@ public class RibbonHttpAsyncClient
}
};
final FutureCallback<HttpResponse> internalCallback = new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse result) {
callback.onCompleted();
}
@Override
public void failed(Exception ex) {
callback.onError(ex);
}
@Override
public void cancelled() {
callback.onError(new ClientException("cancelled"));
}
};
Future<HttpResponse> future = httpclient.execute(HttpAsyncMethods.create(request), consumer, internalCallback);
return fromHttpResponseFuture(future);
return createFuture(future, internalCallback);
}
}
......@@ -14,6 +14,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -21,8 +22,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.http.nio.util.ExpandableBuffer;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -30,11 +29,11 @@ import rx.util.functions.Action1;
import com.google.common.collect.Lists;
import com.netflix.client.AsyncLoadBalancingClient;
import com.netflix.client.AsyncStreamClient.StreamCallback;
import com.netflix.client.ClientException;
import com.netflix.client.ObservableAsyncClient;
import com.netflix.client.ResponseCallback;
import com.netflix.client.StreamDecoder;
import com.netflix.client.StreamResponseCallback;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.http.HttpRequest;
......@@ -101,7 +100,7 @@ public class HttpAsyncClienTest {
}
}
static class SSEDecoder implements StreamDecoder<String, ByteBuffer> {
static class SSEDecoder implements StreamDecoder<List<String>, ByteBuffer> {
final ExpandableByteBuffer dataBuffer = new ExpandableByteBuffer();
@Override
......@@ -135,7 +134,7 @@ public class HttpAsyncClienTest {
e.printStackTrace();
fail(e.getMessage());
}
LogManager.getRootLogger().setLevel((Level)Level.DEBUG);
// LogManager.getRootLogger().setLevel((Level)Level.DEBUG);
}
@Test
......@@ -146,26 +145,44 @@ public class HttpAsyncClienTest {
final AtomicReference<RibbonHttpAsyncClient.AsyncResponse> res = new AtomicReference<RibbonHttpAsyncClient.AsyncResponse>();
client.execute(request, new ResponseCallback<RibbonHttpAsyncClient.AsyncResponse>() {
@Override
public void onResponseReceived(RibbonHttpAsyncClient.AsyncResponse response) {
public void completed(RibbonHttpAsyncClient.AsyncResponse response) {
try {
res.set(response);
person = response.get(Person.class);
} catch (ClientException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
exception.set(e);
}
@Override
public void cancelled() {
}
});
// System.err.println(future.get().get(Person.class));
Thread.sleep(2000);
assertEquals(EmbeddedResources.defaultPerson, person);
assertNull(exception.get());
assertTrue(res.get().getHeaders().get("Content-type").contains("application/json"));
}
@Test
public void testFuture() throws Exception {
URI uri = new URI(SERVICE_URI + "testNetty/person");
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
Future<AsyncResponse> future = client.execute(request, null);
AsyncResponse response = future.get();
// System.err.println(future.get().get(Person.class));
person = response.get(Person.class);
assertEquals(EmbeddedResources.defaultPerson, person);
assertTrue(response.getHeaders().get("Content-type").contains("application/json"));
}
@Test
public void testObservable() throws Exception {
URI uri = new URI(SERVICE_URI + "testNetty/person");
......@@ -182,6 +199,7 @@ public class HttpAsyncClienTest {
}
});
assertEquals(Lists.newArrayList(EmbeddedResources.defaultPerson), result);
System.err.println(observableClient.execute(request).toBlockingObservable().single().get(Person.class));
}
......@@ -194,15 +212,19 @@ public class HttpAsyncClienTest {
final AtomicBoolean hasEntity = new AtomicBoolean(true);
client.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
responseCode.set(response.getStatus());
hasEntity.set(response.hasEntity());
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
exception.set(e);
}
@Override
public void cancelled() {
}
});
Thread.sleep(2000);
assertNull(exception.get());
......@@ -219,7 +241,7 @@ public class HttpAsyncClienTest {
client.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
try {
person = response.get(Person.class);
} catch (ClientException e) { // NOPMD
......@@ -227,8 +249,14 @@ public class HttpAsyncClienTest {
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
}
@Override
public void cancelled() {
}
});
Thread.sleep(2000);
assertEquals(myPerson, person);
......@@ -243,7 +271,7 @@ public class HttpAsyncClienTest {
client.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
try {
person = response.get(Person.class);
} catch (ClientException e) {
......@@ -252,7 +280,13 @@ public class HttpAsyncClienTest {
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
}
@Override
public void cancelled() {
// TODO Auto-generated method stub
}
});
Thread.sleep(2000);
......@@ -266,14 +300,20 @@ public class HttpAsyncClienTest {
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
timeoutClient.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
System.err.println("Got response");
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
exception.set(e);
}
@Override
public void cancelled() {
// TODO Auto-generated method stub
}
});
Thread.sleep(2000);
......@@ -293,7 +333,7 @@ public class HttpAsyncClienTest {
HttpRequest request = HttpRequest.newBuilder().uri(uri).build();
loadBalancingClient.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
try {
person = response.get(Person.class);
} catch (ClientException e) {
......@@ -302,7 +342,13 @@ public class HttpAsyncClienTest {
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
}
@Override
public void cancelled() {
// TODO Auto-generated method stub
}
});
Thread.sleep(2000);
......@@ -327,7 +373,7 @@ public class HttpAsyncClienTest {
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
loadBalancingClient.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
try {
person = response.get(Person.class);
} catch (ClientException e) {
......@@ -336,9 +382,15 @@ public class HttpAsyncClienTest {
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
exception.set(e);
}
@Override
public void cancelled() {
// TODO Auto-generated method stub
}
});
Thread.sleep(10000);
assertNull(exception.get());
......@@ -365,14 +417,20 @@ public class HttpAsyncClienTest {
loadBalancingClient.execute(request, new ResponseCallback<AsyncResponse>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
System.err.println(response.getStatus());
}
@Override
public void onException(Throwable e) {
public void failed(Throwable e) {
exception.set(e);
}
@Override
public void cancelled() {
// TODO Auto-generated method stub
}
});
Thread.sleep(10000);
assertNotNull(exception.get());
......@@ -386,24 +444,28 @@ public class HttpAsyncClienTest {
HttpRequest request = HttpRequest.newBuilder().uri(SERVICE_URI + "testNetty/stream").build();
final List<String> results = Lists.newArrayList();
final CountDownLatch latch = new CountDownLatch(1);
client.stream(request, new SSEDecoder(), new StreamCallback<AsyncResponse, String>() {
client.stream(request, new SSEDecoder(), new StreamResponseCallback<AsyncResponse, List<String>>() {
@Override
public void onResponseReceived(AsyncResponse response) {
public void completed(AsyncResponse response) {
latch.countDown();
}
@Override
public void onError(Throwable e) {
public void failed(Throwable e) {
e.printStackTrace();
}
@Override
public void onCompleted() {
latch.countDown();
public void onContentReceived(List<String> element) {
results.addAll(element);
}
@Override
public void cancelled() {
}
@Override
public void onElement(String element) {
results.add(element);
public void onResponseReceived(AsyncResponse response) {
}
});
latch.await(60, TimeUnit.SECONDS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册