提交 cc751323 编写于 作者: A Allen Wang

Change rxnetty version to 0.3.6. Implementation of ResourceGroup. Change to...

Change rxnetty version to 0.3.6. Implementation of ResourceGroup. Change to ConfigurationBasedServerList to make the server list part of client configuration.
上级 0ea9da49
......@@ -72,8 +72,8 @@ project(':ribbon-transport') {
compile project(':ribbon-core')
compile project(':ribbon-loadbalancer')
compile project(':ribbon-core')
compile 'com.netflix.rxnetty:rx-netty:0.3.5'
compile 'com.netflix.rxnetty:rx-netty-contexts:0.3.5'
compile 'com.netflix.rxnetty:rx-netty:0.3.6'
compile 'com.netflix.rxnetty:rx-netty-contexts:0.3.6'
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
testCompile project(':ribbon-test')
}
......@@ -141,6 +141,7 @@ project(':ribbon-client-extensions') {
compile 'com.netflix.hystrix:hystrix-core:1.4.0-RC4'
compile 'com.netflix.evcache:evcache-client:1.0.5'
compile project(':ribbon-transport')
compile project(':ribbon-eureka')
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
}
......
......@@ -20,7 +20,7 @@ import com.netflix.ribbonclientextensions.hystrix.FallbackHandler;
public class RibbonExamples {
public static void main(String[] args) {
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient");
HttpRequestTemplate<ByteBuf> template = group.requestTemplateBuilder().newRequestTemplate("GetUser")
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("GetUser")
.withResponseValidator(new ResponseValidator<HttpClientResponse<ByteBuf>>() {
@Override
......
package com.netflix.ribbonclientextensions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.loadbalancer.Server;
import com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList;
public final class ClientOptions {
private Map<IClientConfigKey<?>, Object> options;
private ClientOptions() {
options = new ConcurrentHashMap<IClientConfigKey<?>, Object>();
}
public static ClientOptions create() {
return new ClientOptions();
}
public ClientOptions useEurekaDynamicServerList(String vipAddress) {
options.put(IClientConfigKey.CommonKeys.NIWSServerListClassName, DiscoveryEnabledNIWSServerList.class.getName());
options.put(IClientConfigKey.CommonKeys.DeploymentContextBasedVipAddresses, vipAddress);
return this;
}
public ClientOptions useConfigurationBasedServerList(String serverList) {
options.put(IClientConfigKey.CommonKeys.ListOfServers, serverList);
return this;
}
public ClientOptions withMaxAutoRetries(int value) {
options.put(IClientConfigKey.CommonKeys.MaxAutoRetries, value);
return this;
}
public ClientOptions withMaxAutoRetriesNextServer(int value) {
options.put(IClientConfigKey.CommonKeys.MaxAutoRetriesNextServer, value);
return this;
}
public ClientOptions withRetryOnAllOperations(boolean value) {
options.put(IClientConfigKey.CommonKeys.OkToRetryOnAllOperations, value);
return this;
}
public ClientOptions withMaxConnectionsPerHost(int value) {
options.put(IClientConfigKey.CommonKeys.MaxConnectionsPerHost, value);
return this;
}
public ClientOptions withMaxTotalConnections(int value) {
options.put(IClientConfigKey.CommonKeys.MaxTotalConnections, value);
return this;
}
public ClientOptions withConnectTimeout(int value) {
options.put(IClientConfigKey.CommonKeys.ConnectTimeout, value);
return this;
}
public ClientOptions withReadTimeout(int value) {
options.put(IClientConfigKey.CommonKeys.ReadTimeout, value);
return this;
}
public ClientOptions withFollowRedirects(boolean value) {
options.put(IClientConfigKey.CommonKeys.FollowRedirects, value);
return this;
}
public ClientOptions withConnectionPoolIdleEvictTimeMilliseconds(int value) {
options.put(IClientConfigKey.CommonKeys.ConnIdleEvictTimeMilliSeconds, value);
return this;
}
public ClientOptions withLoadBalancerEnabled(boolean value) {
options.put(IClientConfigKey.CommonKeys.InitializeNFLoadBalancer, value);
return this;
}
Map<IClientConfigKey<?>, Object> getOptions() {
return options;
}
}
......@@ -6,8 +6,7 @@ import com.netflix.ribbonclientextensions.hystrix.FallbackHandler;
/**
* @author awang
*
* @param <I> request input entity type
* @param <O> response entity type
* @param <T> response entity type
* @param <R> response meta data, e.g. HttpClientResponse
*/
public interface RequestTemplate<T, R> {
......
package com.netflix.ribbonclientextensions;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.client.config.IClientConfigKey;
public interface ResourceGroup {
String name();
public abstract class ResourceGroup<T extends RequestTemplate<?, ?>> {
private String name;
private IClientConfig clientConfig;
ResourceGroup withLoadBalancer(ILoadBalancer loadBalancer);
public ResourceGroup(String name) {
this(name, null);
}
public ResourceGroup(String name, ClientOptions options) {
this.name = name;
clientConfig = loadDefaultConfig(name);
if (options != null) {
for (IClientConfigKey key: options.getOptions().keySet()) {
clientConfig.setPropertyWithType(key, options.getOptions().get(key));
}
}
}
ResourceGroup withClientConfig(IClientConfig config);
ResourceGroup(IClientConfig clientConfig) {
this.clientConfig = clientConfig;
}
<T extends RequestTemplate<?, ?>> RequestTemplateBuilder<T> requestTemplateBuilder();
protected abstract IClientConfig loadDefaultConfig(String name);
public abstract class RequestTemplateBuilder<T extends RequestTemplate<?, ?>> {
public abstract <S> T newRequestTemplate(String name, Class<? extends S> classType);
protected final IClientConfig getClientConfig() {
return clientConfig;
}
public String name() {
return name;
}
public abstract <S> T newRequestTemplate(String name, Class<? extends S> classType);
}
......@@ -2,8 +2,6 @@ package com.netflix.ribbonclientextensions;
import com.netflix.ribbonclientextensions.http.HttpResourceGroup;
import io.reactivex.netty.protocol.http.client.HttpClient;
public final class Ribbon {
private Ribbon() {
......@@ -12,8 +10,12 @@ public final class Ribbon {
public static HttpResourceGroup createHttpResourceGroup(String name) {
return new HttpResourceGroup(name);
}
public static <I, O, T> T from(Class<T> contract, HttpClient<I, O> transportClient) {
public static HttpResourceGroup createHttpResourceGroup(String name, ClientOptions options) {
return new HttpResourceGroup(name, options);
}
public static <T> T from(Class<T> contract) {
return null;
}
}
......@@ -10,76 +10,38 @@ import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.ribbonclientextensions.ClientOptions;
import com.netflix.ribbonclientextensions.RequestTemplate;
import com.netflix.ribbonclientextensions.ResourceGroup;
public class HttpResourceGroup implements ResourceGroup {
private String groupName;
private IClientConfig config;
private ILoadBalancer loadBalancer;
public class HttpRequestTemplateBuilder extends RequestTemplateBuilder<HttpRequestTemplate<?>>{
private HttpClient<ByteBuf, ByteBuf> client;
HttpRequestTemplateBuilder() {
if (loadBalancer == null) {
client = RibbonTransport.newHttpClient(config);
} else {
client = RibbonTransport.newHttpClient(loadBalancer, config);
}
}
@Override
public <T> HttpRequestTemplate<T> newRequestTemplate(String name, Class<? extends T> type) {
return new HttpRequestTemplate<T>(name, HttpResourceGroup.this, client, type);
}
public HttpRequestTemplate<ByteBuf> newRequestTemplate(String name) {
return newRequestTemplate(name, ByteBuf.class);
}
}
public class HttpResourceGroup extends ResourceGroup<HttpRequestTemplate<?>> {
private final HttpClient<ByteBuf, ByteBuf> client;
@Override
public String name() {
return groupName;
public HttpResourceGroup(String groupName) {
this(groupName, null);
}
public HttpResourceGroup(String groupName) {
this.groupName = groupName;
config = getDefaultConfig(groupName);
public HttpResourceGroup(String groupName, ClientOptions options) {
super(groupName, options);
client = RibbonTransport.newHttpClient(getClientConfig());
}
protected IClientConfig getDefaultConfig(String groupName) {
protected IClientConfig loadDefaultConfig(String groupName) {
return ClientConfigBuilder.newBuilderWithArchaiusProperties(groupName).build();
}
public HttpResourceGroup withCommonHeader(String name, String value) {
return this;
}
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public HttpResourceGroup withClientConfig(IClientConfig overrideConfig) {
if (config instanceof DefaultClientConfigImpl) {
((DefaultClientConfigImpl) config).applyOverride(overrideConfig);
} else {
for (IClientConfigKey key: CommonClientConfigKey.keys()) {
Object value = overrideConfig.getPropertyWithType(key);
if (value != null) {
config.setPropertyWithType(key, value);
}
}
}
return this;
public <T> HttpRequestTemplate<T> newRequestTemplate(String name,
Class<? extends T> classType) {
return new HttpRequestTemplate<T>(name, HttpResourceGroup.this, client, classType);
}
public HttpResourceGroup withLoadBalancer(ILoadBalancer loadBalancer) {
this.loadBalancer = loadBalancer;
return this;
public HttpRequestTemplate<ByteBuf> newRequestTemplate(String name) {
return newRequestTemplate(name, ByteBuf.class);
}
@SuppressWarnings("unchecked")
@Override
public HttpRequestTemplateBuilder requestTemplateBuilder() {
return new HttpRequestTemplateBuilder();
}
}
......@@ -11,7 +11,6 @@ import rx.Observable;
* @author awang
*
* @param <T> Output entity type
* @param <R> Response
*/
public interface FallbackHandler<T> {
public Observable<T> getFallback(HystrixExecutableInfo<?> hystrixInfo, Map<String, Object> requestProperties);
......
package com.netflix.ribbonclientextensions;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.Test;
import rx.Observable;
......@@ -26,10 +24,6 @@ import rx.functions.Func1;
import com.google.common.collect.Lists;
import com.google.mockwebserver.MockResponse;
import com.google.mockwebserver.MockWebServer;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixExecutableInfo;
......@@ -53,15 +47,11 @@ public class RibbonTest {
.setBody(content));
server.play();
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(
new Server("localhost", 12345),
new Server("localhost", 10092),
new Server("localhost", server.getPort())));
// HttpClient<ByteBuf, ByteBuf> httpClient = RibbonTransport.newHttpClient(lb, DefaultClientConfigImpl.getClientConfigWithDefaultValues().setPropertyWithType(CommonClientConfigKey.MaxAutoRetriesNextServer, 3));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient");
group.withLoadBalancer(lb)
.withClientConfig(DefaultClientConfigImpl.getEmptyConfig().setPropertyWithType(CommonClientConfigKey.MaxAutoRetriesNextServer, 3));
HttpRequestTemplate<ByteBuf> template = group.requestTemplateBuilder().newRequestTemplate("test", ByteBuf.class);
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient",
ClientOptions.create()
.withMaxAutoRetriesNextServer(3)
.useConfigurationBasedServerList("localhost:12345, localhost:10092, localhost:" + server.getPort()));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
RibbonRequest<ByteBuf> request = template.withUri("/").requestBuilder().build();
String result = request.execute().toString(Charset.defaultCharset());
assertEquals(content, result);
......@@ -77,14 +67,11 @@ public class RibbonTest {
.setBody(content));
server.play();
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(new Server("localhost", server.getPort())));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient");
group.withLoadBalancer(lb)
.withClientConfig(DefaultClientConfigImpl.getEmptyConfig().setPropertyWithType(CommonClientConfigKey.MaxAutoRetriesNextServer, 3));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient", ClientOptions.create()
.useConfigurationBasedServerList("localhost:" + server.getPort())
.withMaxAutoRetriesNextServer(3));
HttpRequestTemplate<ByteBuf> template = group.withLoadBalancer(lb)
.requestTemplateBuilder()
.newRequestTemplate("test", ByteBuf.class);
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test");
RibbonRequest<ByteBuf> request = template.withUri("/")
.addCacheProvider("somekey", new CacheProvider<ByteBuf>(){
@Override
......@@ -124,14 +111,10 @@ public class RibbonTest {
.setBody(content));
server.play();
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(new Server("localhost", server.getPort())));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient");
group.withLoadBalancer(lb);
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient", ClientOptions.create()
.useConfigurationBasedServerList("localhost:" + server.getPort()));
HttpRequestTemplate<ByteBuf> template = group.withLoadBalancer(lb)
.requestTemplateBuilder()
.newRequestTemplate("test", ByteBuf.class);
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
template.withResponseValidator(new ResponseValidator<HttpClientResponse<ByteBuf>>() {
@Override
......@@ -166,12 +149,10 @@ public class RibbonTest {
@Test
public void testFallback() throws IOException {
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(new Server("localhost", 12345)));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient");
group.withLoadBalancer(lb)
.withClientConfig(DefaultClientConfigImpl.getEmptyConfig().setPropertyWithType(IClientConfigKey.CommonKeys.MaxAutoRetriesNextServer, 1));
HttpRequestTemplate<ByteBuf> template = group.requestTemplateBuilder().newRequestTemplate("test", ByteBuf.class);
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient", ClientOptions.create()
.useConfigurationBasedServerList("localhost:12345")
.withMaxAutoRetriesNextServer(1));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
final String fallback = "fallback";
RibbonRequest<ByteBuf> request = template.withUri("/")
.withFallbackProvider(new FallbackHandler<ByteBuf>() {
......@@ -179,7 +160,11 @@ public class RibbonTest {
public Observable<ByteBuf> getFallback(
HystrixExecutableInfo<?> hystrixInfo,
Map<String, Object> requestProperties) {
return Observable.just(Unpooled.buffer().writeBytes(fallback.getBytes()));
try {
return Observable.just(Unpooled.buffer().writeBytes(fallback.getBytes("UTF-8")));
} catch (UnsupportedEncodingException e) {
return Observable.error(e);
}
}
})
.requestBuilder().build();
......@@ -208,11 +193,10 @@ public class RibbonTest {
@Test
public void testCacheHit() {
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(new Server("localhost", 12345)));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient")
.withClientConfig(DefaultClientConfigImpl.getEmptyConfig().setPropertyWithType(IClientConfigKey.CommonKeys.MaxAutoRetriesNextServer, 1));
HttpRequestTemplate<ByteBuf> template = group.withLoadBalancer(lb)
.requestTemplateBuilder().newRequestTemplate("test");
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient", ClientOptions.create()
.useConfigurationBasedServerList("localhost:12345")
.withMaxAutoRetriesNextServer(1));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test");
final String content = "from cache";
final String cacheKey = "somekey";
RibbonRequest<ByteBuf> request = template.addCacheProvider(cacheKey, new CacheProvider<ByteBuf>(){
......@@ -225,7 +209,11 @@ public class RibbonTest {
@Override
public Observable<ByteBuf> get(String key, Map<String, Object> vars) {
if (key.equals(cacheKey)) {
return Observable.just(Unpooled.buffer().writeBytes(content.getBytes()));
try {
return Observable.just(Unpooled.buffer().writeBytes(content.getBytes("UTF-8")));
} catch (UnsupportedEncodingException e) {
return Observable.error(e);
}
} else {
return Observable.error(new Exception("Cache miss"));
}
......@@ -249,11 +237,11 @@ public class RibbonTest {
.setBody(content));
server.play();
ILoadBalancer lb = LoadBalancerBuilder.newBuilder().buildFixedServerListLoadBalancer(Lists.newArrayList(new Server("localhost", server.getPort())));
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient")
.withClientConfig(DefaultClientConfigImpl.getEmptyConfig().setPropertyWithType(IClientConfigKey.CommonKeys.MaxAutoRetriesNextServer, 1));
HttpRequestTemplate<ByteBuf> template = group.withLoadBalancer(lb)
.requestTemplateBuilder().newRequestTemplate("test");
HttpResourceGroup group = Ribbon.createHttpResourceGroup("myclient", ClientOptions.create()
.useConfigurationBasedServerList("localhost:" + server.getPort())
.withMaxAutoRetriesNextServer(1));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test");
final String cacheKey = "somekey";
RibbonRequest<ByteBuf> request = template.addCacheProvider(cacheKey, new CacheProvider<ByteBuf>(){
@Override
......@@ -274,5 +262,5 @@ public class RibbonTest {
.requestBuilder().build();
String result = request.execute().toString(Charset.defaultCharset());
assertEquals(content, result);
}
}
}
......@@ -13,7 +13,7 @@ public class TemplateBuilderTest {
public void testVarReplacement() {
HttpResourceGroup group = Ribbon.createHttpResourceGroup("test");
HttpRequestTemplate<ByteBuf> template = group.requestTemplateBuilder().newRequestTemplate("resource1", ByteBuf.class);
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("resource1", ByteBuf.class);
template.withUri("/foo/{id}?name={name}");
HttpClientRequest<ByteBuf> request = template
.requestBuilder()
......
......@@ -4,7 +4,7 @@ public class ClientConfigBuilder {
private IClientConfig config;
private ClientConfigBuilder() {
ClientConfigBuilder() {
}
public static ClientConfigBuilder newBuilder() {
......@@ -100,13 +100,15 @@ public class ClientConfigBuilder {
return this;
}
public ClientConfigBuilder withMaxHttpConnectionsPerHost(int value) {
public ClientConfigBuilder withMaxConnectionsPerHost(int value) {
config.setPropertyWithType(CommonClientConfigKey.MaxHttpConnectionsPerHost, value);
config.setPropertyWithType(CommonClientConfigKey.MaxConnectionsPerHost, value);
return this;
}
public ClientConfigBuilder withMaxTotalHttpConnections(int value) {
public ClientConfigBuilder withMaxTotalConnections(int value) {
config.setPropertyWithType(CommonClientConfigKey.MaxTotalHttpConnections, value);
config.setPropertyWithType(CommonClientConfigKey.MaxTotalConnections, value);
return this;
}
......
......@@ -182,6 +182,8 @@ public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
public static final IClientConfigKey<String> RequestIdHeaderName = new CommonClientConfigKey<String>("RequestIdHeaderName") {};
public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {};
private static final Set<IClientConfigKey> keys = new HashSet<IClientConfigKey>();
static {
......
......@@ -23,6 +23,7 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.AbstractConfiguration;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -430,6 +431,7 @@ public class DefaultClientConfigImpl implements IClientConfig {
putDefaultStringProperty(CommonClientConfigKey.VipAddressResolverClassName, getDefaultVipaddressResolverClassname());
putDefaultBooleanProperty(CommonClientConfigKey.IsClientAuthRequired, getDefaultIsClientAuthRequired());
putDefaultStringProperty(CommonClientConfigKey.RequestIdHeaderName, getDefaultRequestIdHeaderName());
putDefaultStringProperty(CommonClientConfigKey.ListOfServers, "");
}
public Boolean getDefaultEnableConnectionPool() {
......@@ -574,9 +576,42 @@ public class DefaultClientConfigImpl implements IClientConfig {
if (prop.startsWith(getNameSpace())){
prop = prop.substring(getNameSpace().length() + 1);
}
setPropertyInternal(prop, props.getProperty(key));
setPropertyInternal(prop, getStringValue(props, key));
}
}
/**
* This is to workaround the issue that {@link AbstractConfiguration} by default
* automatically convert comma delimited string to array
*/
protected static String getStringValue(Configuration config, String key) {
try {
String values[] = config.getStringArray(key);
if (values == null) {
return null;
}
if (values.length == 0) {
return config.getString(key);
} else if (values.length == 1) {
return values[0];
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < values.length; i++) {
sb.append(values[i]);
if (i != values.length - 1) {
sb.append(",");
}
}
return sb.toString();
} catch (Exception e) {
Object v = config.getProperty(key);
if (v != null) {
return String.valueOf(v);
} else {
return null;
}
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DC_DOUBLECHECK")
......
......@@ -10,6 +10,7 @@ import java.util.concurrent.CountDownLatch;
import rx.Observer;
import com.google.common.collect.Lists;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.client.netty.http.NettyHttpClient;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.LoadBalancerBuilder;
......@@ -22,7 +23,7 @@ public class LoadBalancingExample {
BaseLoadBalancer lb = LoadBalancerBuilder.newBuilder()
.buildFixedServerListLoadBalancer(servers);
NettyHttpClient<ByteBuf, ByteBuf> client = NettyHttpClient.createDefaultHttpClient(lb);
NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(lb);
final CountDownLatch latch = new CountDownLatch(servers.size());
Observer<HttpClientResponse<ByteBuf>> observer = new Observer<HttpClientResponse<ByteBuf>>() {
@Override
......
......@@ -10,12 +10,13 @@ import java.util.concurrent.TimeUnit;
import rx.functions.Action1;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.client.netty.http.NettyHttpClient;
public class SimpleGet {
@edu.umd.cs.findbugs.annotations.SuppressWarnings
public static void main(String[] args) throws Exception {
NettyHttpClient<ByteBuf, ByteBuf> client = NettyHttpClient.createDefaultHttpClient();
NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient();
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/");
final CountDownLatch latch = new CountDownLatch(1);
......
......@@ -22,6 +22,8 @@ import java.util.Collections;
import java.util.List;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicPropertyFactory;
......@@ -40,44 +42,31 @@ import com.netflix.config.DynamicStringProperty;
*/
public class ConfigurationBasedServerList extends AbstractServerList<Server> {
public static final String PROP_NAME = "listOfServers";
private String propertyName = DefaultClientConfigImpl.DEFAULT_PROPERTY_NAME_SPACE + "." + PROP_NAME;
private DynamicStringProperty dynamicProp;
private volatile List<Server> list = Collections.emptyList();
private IClientConfig clientConfig;
@Override
public List<Server> getInitialListOfServers() {
return list;
return getUpdatedListOfServers();
}
@Override
public List<Server> getUpdatedListOfServers() {
return list;
String listOfServers = clientConfig.getPropertyWithType(CommonClientConfigKey.ListOfServers);
return derive(listOfServers);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
propertyName = clientConfig.getClientName() + "." + clientConfig.getNameSpace() + "." + PROP_NAME;
dynamicProp = DynamicPropertyFactory.getInstance().getStringProperty(propertyName, null);
derive();
dynamicProp.addCallback(new Runnable() {
@Override
public void run() {
derive();
}
});
this.clientConfig = clientConfig;
}
private void derive() {
String value = dynamicProp.get();
if (Strings.isNullOrEmpty(value)) {
list = Collections.emptyList();
} else {
List<Server> newList = new ArrayList<Server>();
private List<Server> derive(String value) {
List<Server> list = Lists.newArrayList();
if (!Strings.isNullOrEmpty(value)) {
for (String s: value.split(",")) {
newList.add(new Server(s.trim()));
list.add(new Server(s.trim()));
}
list = newList;
}
return list;
}
}
......@@ -32,8 +32,7 @@ public class ConfigurationBasedServerListTest {
@Test
public void testList() {
ConfigurationBasedServerList list = new ConfigurationBasedServerList();
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
config.setClientName("junit1");
DefaultClientConfigImpl config = DefaultClientConfigImpl.getClientConfigWithDefaultValues("junit1");
list.initWithNiwsConfig(config);
assertTrue(list.getInitialListOfServers().isEmpty());
ConfigurationManager.getConfigInstance().setProperty("junit1.ribbon.listOfServers", "abc.com:80,microsoft.com,1.2.3.4:8080");
......
......@@ -29,7 +29,7 @@ public abstract class LoadBalancingRxClientWithPoolOptions<I, O, T extends RxCli
protected GlobalPoolStats<T> stats;
private Observable<PoolStateChangeEvent> poolStateChangeEventObservable;
protected ScheduledExecutorService poolCleanerScheduler;
protected boolean poolEnabled;
protected boolean poolEnabled = true;
public LoadBalancingRxClientWithPoolOptions(IClientConfig config,
RetryHandler retryHandler,
......
......@@ -92,57 +92,63 @@ public final class RibbonTransport {
return new LoadBalancingUdpClient<I, O>(config, getDefaultRetryHandlerWithConfig(config), pipelineConfigurator);
}
public static HttpClient<ByteBuf, ByteBuf> newHttpClient() {
public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient() {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newHttpClient(config);
}
public static HttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config) {
public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config) {
return new NettyHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}
public static HttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {
public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) {
return new NettyHttpClient<ByteBuf, ByteBuf>(loadBalancer, config, retryHandler, DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}
public static HttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(IClientConfig config) {
return new NettyHttpClient<ByteBuf, ByteBuf>(config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_HTTP_PIPELINE_CONFIGURATOR, poolCleanerScheduler);
}
public static <I, O> HttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static NettyHttpClient<ByteBuf, ByteBuf> newHttpClient(ILoadBalancer loadBalancer) {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newHttpClient(loadBalancer, config);
}
public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return new NettyHttpClient<I, O>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
}
public static <I, O> HttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return new NettyHttpClient<I, O>(config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator, poolCleanerScheduler);
}
public static <I, O> HttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
public static <I, O> NettyHttpClient<I, O> newHttpClient(PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config, RetryHandler retryHandler) {
return new NettyHttpClient<I, O>(config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}
public static HttpClient<ByteBuf, ServerSentEvent> newSSEClient(ILoadBalancer loadBalancer, IClientConfig config) {
public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient(ILoadBalancer loadBalancer, IClientConfig config) {
return new SSEClient<ByteBuf>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_SSE_PIPELINE_CONFIGURATOR);
}
public static HttpClient<ByteBuf, ServerSentEvent> newSSEClient(IClientConfig config) {
public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient(IClientConfig config) {
return new SSEClient<ByteBuf>(config, getDefaultHttpRetryHandlerWithConfig(config), DEFAULT_SSE_PIPELINE_CONFIGURATOR);
}
public static <I> HttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
public static <I> NettyHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
ILoadBalancer loadBalancer, IClientConfig config) {
return new SSEClient<I>(loadBalancer, config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator);
}
public static <I> HttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
public static <I> NettyHttpClient<I, ServerSentEvent> newSSEClient(PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>> pipelineConfigurator,
IClientConfig config) {
return new SSEClient<I>(config, getDefaultHttpRetryHandlerWithConfig(config), pipelineConfigurator);
}
public static HttpClient<ByteBuf, ServerSentEvent> newSSEClient() {
public static NettyHttpClient<ByteBuf, ServerSentEvent> newSSEClient() {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues();
return newSSEClient(config);
}
......
......@@ -612,8 +612,7 @@ public class NettyClientTest {
assertEquals("value1", responseContext.get().getContext("Context1"));
}
// temporarily exclude the test due to RxNetty bug
@Ignore
@Test
public void testRedirect() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/redirect?port=" + port);
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册