提交 4aeb4f10 编写于 作者: A Allen Wang

Updated API in LoadBalancerExecutor to retry execution on the same server....

Updated API in LoadBalancerExecutor to retry execution on the same server. Added the capability that if the URI is absolute the NettyHttpClient will submit request directly to server without consulting load balancer.
上级 cc751323
......@@ -39,12 +39,12 @@ public class RibbonExamples {
})
.withHystrixProperties((HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("mygroup"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(2000))))
.withUri("/{id}");
.withUriTemplate("/{id}");
template.requestBuilder().withRequestProperty("id", 1).build().execute();
// example showing the use case of getting the entity with Hystrix meta data
template.withUri("/{id}").requestBuilder().withRequestProperty("id", 3).build().withMetadata().observe()
template.withUriTemplate("/{id}").requestBuilder().withRequestProperty("id", 3).build().withMetadata().observe()
.flatMap(new Func1<RibbonResponse<Observable<ByteBuf>>, Observable<String>>() {
@Override
public Observable<String> call(RibbonResponse<Observable<ByteBuf>> t1) {
......
......@@ -24,7 +24,7 @@ public interface RequestTemplate<T, R> {
/**
* Calling this method will enable both Hystrix request cache and supplied external cache providers
* on the supplied cache key. Caller can explicitly disable Hystrix request cache by calling
* {@link #withHystrixCommandPropertiesDefaults(com.netflix.hystrix.HystrixCommandProperties.Setter)}
* {@link #withHystrixProperties(com.netflix.hystrix.HystrixObservableCommand.Setter)}
*
* @param cacheKeyTemplate
* @return
......
......@@ -91,7 +91,7 @@ public class HttpRequestTemplate<T> implements RequestTemplate<T, HttpClientResp
return parsedTemplate;
}
public HttpRequestTemplate<T> withUri(String uri) {
public HttpRequestTemplate<T> withUriTemplate(String uri) {
this.parsedUriTemplate = createParsedTemplate(uri);
return this;
}
......
......@@ -52,7 +52,7 @@ public class RibbonTest {
.withMaxAutoRetriesNextServer(3)
.useConfigurationBasedServerList("localhost:12345, localhost:10092, localhost:" + server.getPort()));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
RibbonRequest<ByteBuf> request = template.withUri("/").requestBuilder().build();
RibbonRequest<ByteBuf> request = template.withUriTemplate("/").requestBuilder().build();
String result = request.execute().toString(Charset.defaultCharset());
assertEquals(content, result);
}
......@@ -72,7 +72,7 @@ public class RibbonTest {
.withMaxAutoRetriesNextServer(3));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test");
RibbonRequest<ByteBuf> request = template.withUri("/")
RibbonRequest<ByteBuf> request = template.withUriTemplate("/")
.addCacheProvider("somekey", new CacheProvider<ByteBuf>(){
@Override
public Observable<ByteBuf> get(String key, Map<String, Object> vars) {
......@@ -122,7 +122,7 @@ public class RibbonTest {
throw new UnsuccessfulResponseException("error", new IllegalArgumentException());
}
});
RibbonRequest<ByteBuf> request = template.withUri("/").requestBuilder().build();
RibbonRequest<ByteBuf> request = template.withUriTemplate("/").requestBuilder().build();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
request.toObservable().subscribe(new Action1<ByteBuf>() {
......@@ -154,7 +154,7 @@ public class RibbonTest {
.withMaxAutoRetriesNextServer(1));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
final String fallback = "fallback";
RibbonRequest<ByteBuf> request = template.withUri("/")
RibbonRequest<ByteBuf> request = template.withUriTemplate("/")
.withFallbackProvider(new FallbackHandler<ByteBuf>() {
@Override
public Observable<ByteBuf> getFallback(
......@@ -219,7 +219,7 @@ public class RibbonTest {
}
}
})
.withUri("/")
.withUriTemplate("/")
.withHystrixProperties(HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("group"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withRequestCacheEnabled(false))
)
......@@ -255,7 +255,7 @@ public class RibbonTest {
return Observable.error(new Exception("Cache miss again"));
}
})
.withUri("/")
.withUriTemplate("/")
.withHystrixProperties(HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("group"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withRequestCacheEnabled(false))
)
......
......@@ -14,7 +14,7 @@ public class TemplateBuilderTest {
HttpResourceGroup group = Ribbon.createHttpResourceGroup("test");
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("resource1", ByteBuf.class);
template.withUri("/foo/{id}?name={name}");
template.withUriTemplate("/foo/{id}?name={name}");
HttpClientRequest<ByteBuf> request = template
.requestBuilder()
.withRequestProperty("id", "3")
......
......@@ -21,6 +21,8 @@ import static com.netflix.loadbalancer.LoadBalancerExecutor.CallableToObservable
import java.net.URI;
import rx.Observable;
import com.google.common.base.Preconditions;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfig;
......@@ -89,17 +91,18 @@ extends LoadBalancerExecutor implements IClient<S, T>, IClientConfigAware {
Preconditions.checkNotNull(host);
int port = request.getUri().getPort();
Preconditions.checkArgument(port > 0, "port is undefined");
final Server server = new Server(host, port);
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
ClientCallableProvider<T> callableProvider = new ClientCallableProvider<T>() {
@Override
public T executeOnServer(Server server) throws Exception {
return execute(request, requestConfig);
}
};
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
try {
return RxUtils.getSingleValueWithRealErrorCause(retrySameServer(new Server(host, port), toObsevableProvider(callableProvider), handler));
Observable<T> result = execute(server, toObsevableProvider(callableProvider).getObservableForEndpoint(server), handler);
return RxUtils.getSingleValueWithRealErrorCause(result);
} catch (Exception e) {
if (e instanceof ClientException) {
throw (ClientException) e;
......
......@@ -59,7 +59,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
public static <T> ClientObservableProvider<T> toObsevableProvider(ClientCallableProvider<T> callableProvider) {
return new CallableToObservable<T>(callableProvider);
}
public CallableToObservable(ClientCallableProvider<T> callableProvider) {
this.callableProvider = callableProvider;
}
......@@ -268,7 +268,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
t1.onError(e);
return Subscriptions.empty();
}
return retrySameServer(server, clientObservableProvider, retryHandler).subscribe(t1);
return execute(server, clientObservableProvider.getObservableForEndpoint(server), retryHandler).subscribe(t1);
}
};
Observable<T> observable = createObservableFromOnSubscribeFunc(onSubscribe);
......@@ -282,7 +282,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
* by the external {@link Observer}. If number of retries exceeds the maximal retries allowed on one server, a final error will
* be emitted by the returned {@link Observable}.
*/
protected <T> Observable<T> retrySameServer(final Server server, final ClientObservableProvider<T> clientObservableProvider, final RetryHandler errorHandler) {
public <T> Observable<T> execute(final Server server, final Observable<T> singleHostObservable, final RetryHandler errorHandler) {
OnSubscribe<T> onSubscribe = new OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> t1) {
......@@ -321,7 +321,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
noteRequestCompletion(serverStats, entity, exception, duration, errorHandler);
}
};
clientObservableProvider.getObservableForEndpoint(server).subscribe(delegate);
singleHostObservable.subscribe(delegate);
}
};
......
......@@ -8,6 +8,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import com.google.common.collect.Lists;
import com.netflix.client.RetryHandler;
......@@ -26,12 +28,19 @@ public class LoadBalancerExecutorTest {
ClientObservableProvider<String> observableProvider = new ClientObservableProvider<String>() {
AtomicInteger count = new AtomicInteger();
@Override
public Observable<String> getObservableForEndpoint(Server server) {
if (count.incrementAndGet() < 3) {
return Observable.error(new IllegalArgumentException());
} else {
return Observable.from(server.getHost());
}
public Observable<String> getObservableForEndpoint(final Server server) {
return Observable.create(new OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> t1) {
if (count.incrementAndGet() < 3) {
t1.onError(new IllegalArgumentException());
} else {
t1.onNext(server.getHost());
t1.onCompleted();
}
}
});
}
};
RetryHandler handler = new RetryHandler() {
......@@ -52,7 +61,7 @@ public class LoadBalancerExecutorTest {
return 0;
}
};
String result = lbExecutor.retrySameServer(server1, observableProvider, handler).toBlockingObservable().single();
String result = lbExecutor.execute(server1, observableProvider.getObservableForEndpoint(server1), handler).toBlockingObservable().single();
assertEquals(3, lbExecutor.getServerStats(server1).getTotalRequestsCount());
assertEquals("1", result);
}
......@@ -63,12 +72,19 @@ public class LoadBalancerExecutorTest {
ClientObservableProvider<String> observableProvider = new ClientObservableProvider<String>() {
AtomicInteger count = new AtomicInteger();
@Override
public Observable<String> getObservableForEndpoint(Server server) {
if (count.incrementAndGet() < 3) {
return Observable.error(new IllegalArgumentException());
} else {
return Observable.from(server.getHost());
}
public Observable<String> getObservableForEndpoint(final Server server) {
return Observable.create(new OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> t1) {
if (count.incrementAndGet() < 3) {
t1.onError(new IllegalArgumentException());
} else {
t1.onNext(server.getHost());
t1.onCompleted();
}
}
});
}
};
RetryHandler handler = new RetryHandler() {
......
......@@ -38,6 +38,8 @@ import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.client.RepeatableContentHttpRequest;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -178,11 +180,16 @@ public class NettyHttpClient<I, O> extends LoadBalancingRxClientWithPoolOptions<
public Observable<HttpClientResponse<O>> submit(final HttpClientRequest<I> request, final RetryHandler errorHandler, final IClientConfig requestConfig) {
final RepeatableContentHttpRequest<I> repeatableRequest = getRepeatableRequest(request);
final RetryHandler retryHandler = (errorHandler == null) ? getRequestRetryHandler(request, requestConfig) : errorHandler;
final ClientConfig rxClientConfig = getRxClientConfig(requestConfig);
Observable<HttpClientResponse<O>> result = submitToServerInURI(repeatableRequest, rxClientConfig, errorHandler);
if (result != null) {
return result;
}
return lbExecutor.executeWithLoadBalancer(new ClientObservableProvider<HttpClientResponse<O>>() {
@Override
public Observable<HttpClientResponse<O>> getObservableForEndpoint(
Server server) {
return submit(server.getHost(), server.getPort(), repeatableRequest, requestConfig);
return submit(server.getHost(), server.getPort(), repeatableRequest, rxClientConfig);
}
}, retryHandler);
}
......@@ -211,15 +218,43 @@ public class NettyHttpClient<I, O> extends LoadBalancingRxClientWithPoolOptions<
@Override
public Observable<HttpClientResponse<O>> submit(HttpClientRequest<I> request, final ClientConfig config) {
final RepeatableContentHttpRequest<I> repeatableRequest = getRepeatableRequest(request);
final RetryHandler retryHandler = getRequestRetryHandler(request, null);
Observable<HttpClientResponse<O>> result = submitToServerInURI(request, config, retryHandler);
if (result != null) {
return result;
}
return lbExecutor.executeWithLoadBalancer(new ClientObservableProvider<HttpClientResponse<O>>() {
@Override
public Observable<HttpClientResponse<O>> getObservableForEndpoint(
Server server) {
return submit(server.getHost(), server.getPort(), repeatableRequest, config);
}
});
}, retryHandler);
}
private Observable<HttpClientResponse<O>> submitToServerInURI(HttpClientRequest<I> request, ClientConfig config, RetryHandler errorHandler) {
URI uri;
try {
uri = new URI(request.getUri());
} catch (URISyntaxException e) {
return Observable.error(e);
}
String host = uri.getHost();
if (host == null) {
return null;
}
int port = uri.getPort();
if (port < 0) {
if (clientConfig.getPropertyAsBoolean(IClientConfigKey.CommonKeys.IsSecure, false)) {
port = 443;
} else {
port = 80;
}
}
Server server = new Server(host, port);
return lbExecutor.execute(server, submit(server.getHost(), server.getPort(), request, config), errorHandler);
}
/**
* Create an {@link ObservableConnection} with a server chosen by the load balancer.
*/
......
......@@ -29,6 +29,7 @@ import io.reactivex.netty.contexts.ContextsContainer;
import io.reactivex.netty.contexts.ContextsContainerImpl;
import io.reactivex.netty.contexts.MapBackedKeySupplier;
import io.reactivex.netty.contexts.RxContexts;
import io.reactivex.netty.protocol.http.client.HttpClient.HttpClientConfig;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
......@@ -139,7 +140,7 @@ public class NettyClientTest {
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
// final List<Person> result = Lists.newArrayList();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
// need to sleep to wait until connection is released
Thread.sleep(1000);
......@@ -150,18 +151,36 @@ public class NettyClientTest {
assertEquals(1, stats.getTotalConnectionCount());
}
@Test
public void testSubmitToAbsoluteURI() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
// final List<Person> result = Lists.newArrayList();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
// need to sleep to wait until connection is released
Thread.sleep(1000);
GlobalPoolStats stats = (GlobalPoolStats) observableClient.getStats();
assertEquals(1, stats.getIdleCount());
assertEquals(1, stats.getAcquireSucceededCount());
assertEquals(1, stats.getReleaseSucceededCount());
assertEquals(1, stats.getTotalConnectionCount());
}
@Test
public void testPoolReuse() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
// final List<Person> result = Lists.newArrayList();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
assertEquals(1, observableClient.getStats().getIdleCount());
response = observableClient.submit(host, port, request);
person = getPersonObservable(response).toBlockingObservable().single();
person = getPersonObservable(response).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
Thread.sleep(1000);
GlobalPoolStats stats = (GlobalPoolStats) observableClient.getStats();
......@@ -181,7 +200,7 @@ public class NettyClientTest {
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(CommonClientConfigKey.ReadTimeout, 10000));
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(myPerson, person);
}
......@@ -198,8 +217,8 @@ public class NettyClientTest {
.withContent(buffer);
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(CommonClientConfigKey.ReadTimeout, 10000));
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(host, port, request);
Person person = getPersonObservable(response).toBlockingObservable().single();
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
Person person = getPersonObservable(response).toBlocking().single();
assertEquals(myPerson, person);
}
......@@ -223,7 +242,7 @@ public class NettyClientTest {
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(
DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ReadTimeout, "100"));
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/readTimeout");
Observable<HttpClientResponse<ByteBuf>> observable = observableClient.submit(host, port, request);
Observable<HttpClientResponse<ByteBuf>> observable = observableClient.submit(request);
ObserverWithLatch<HttpClientResponse<ByteBuf>> observer = new ObserverWithLatch<HttpClientResponse<ByteBuf>>();
observable.subscribe(observer);
observer.await();
......@@ -245,7 +264,7 @@ public class NettyClientTest {
NettyHttpClient<ByteBuf, ByteBuf> lbObservables = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(lb, config,
new NettyHttpLoadBalancerErrorHandler(1, 3, true));
Person person = getPersonObservable(lbObservables.submit(request)).toBlockingObservable().single();
Person person = getPersonObservable(lbObservables.submit(request)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
ServerStats stats = lbObservables.getServerStats(badServer);
// two requests to bad server because retry same server is set to 1
......@@ -260,6 +279,38 @@ public class NettyClientTest {
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testObservableWithMultipleServersWithOverrideRxConfig() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/person");
Server badServer = new Server("localhost:12345");
Server goodServer = new Server("localhost:" + port);
List<Server> servers = Lists.newArrayList(badServer, badServer, badServer, goodServer);
BaseLoadBalancer lb = LoadBalancerBuilder.<Server>newBuilder()
.withRule(new AvailabilityFilteringRule())
.withPing(new DummyPing())
.buildFixedServerListLoadBalancer(servers);
NettyHttpClient<ByteBuf, ByteBuf> lbObservables = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(lb, config,
new NettyHttpLoadBalancerErrorHandler(1, 3, true));
HttpClientConfig rxconfig = HttpClientConfig.Builder.newDefaultConfig();
Person person = getPersonObservable(lbObservables.submit(request, rxconfig)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
ServerStats stats = lbObservables.getServerStats(badServer);
// two requests to bad server because retry same server is set to 1
assertEquals(4, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(4, stats.getSuccessiveConnectionFailureCount());
stats = lbObservables.getServerStats(goodServer);
// two requests to bad server because retry same server is set to 1
assertEquals(1, stats.getTotalRequestsCount());
assertEquals(0, stats.getActiveRequestsCount());
assertEquals(0, stats.getSuccessiveConnectionFailureCount());
}
@Test
public void testObservableWithRetrySameServer() throws Exception {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "1000");
......@@ -474,7 +525,7 @@ public class NettyClientTest {
return null;
}
}
}).toBlockingObservable().getIterator();
}).toBlocking().getIterator();
while (iterator.hasNext()) {
result.add(iterator.next());
}
......@@ -516,7 +567,7 @@ public class NettyClientTest {
Person myPerson = new Person("hello_world", 4);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/personQuery?name=" + myPerson.name + "&age=" + myPerson.age);
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
Person person = getPersonObservable(observableClient.submit(host, port, request)).toBlockingObservable().single();
Person person = getPersonObservable(observableClient.submit(host, port, request)).toBlocking().single();
assertEquals(myPerson, person);
}
......@@ -531,10 +582,8 @@ public class NettyClientTest {
@Override
public void call(HttpClientResponse<ByteBuf> t1) {
System.err.println("Get response: " + t1.getStatus().code());
latch.countDown();
}
}, new Action1<Throwable>(){
@Override
......@@ -551,7 +600,7 @@ public class NettyClientTest {
@Test
public void testLoadBalancerThrottle() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/throttle");
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/throttle");
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(IClientConfigKey.CommonKeys.MaxAutoRetriesNextServer, 1);
BaseLoadBalancer lb = new BaseLoadBalancer(new DummyPing(), new AvailabilityFilteringRule());
NettyHttpClient<ByteBuf, ByteBuf> lbObservables = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(lb, config);
......@@ -607,7 +656,7 @@ public class NettyClientTest {
responseContext.set(RxContexts.DEFAULT_CORRELATOR.getContextForClientRequest(requestId));
return t1.toString(Charset.defaultCharset());
}
}).toBlockingObservable().single();
}).toBlocking().single();
assertEquals(requestId, requestIdSent);
assertEquals("value1", responseContext.get().getContext("Context1"));
}
......@@ -616,7 +665,7 @@ public class NettyClientTest {
public void testRedirect() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/redirect?port=" + port);
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
Person person = getPersonObservable(observableClient.submit(host, port, request)).toBlockingObservable().single();
Person person = getPersonObservable(observableClient.submit(host, port, request)).toBlocking().single();
assertEquals(EmbeddedResources.defaultPerson, person);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册