提交 b35e3762 编写于 作者: A Allen Wang

Merge pull request #137 from allenxwang/cp

Fix #136
......@@ -46,7 +46,7 @@ project(':ribbon-loadbalancer') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.netflix-commons:netflix-statistics:0.1.1'
compile 'com.netflix.rxjava:rxjava-core:[0.17,)'
compile 'com.netflix.rxjava:rxjava-core:0.18+'
}
}
......@@ -102,7 +102,10 @@ project(':ribbon-examples') {
project(':ribbon-test') {
dependencies {
compile project(':ribbon-core')
compile 'com.netflix.rxjava:rxjava-core:[0.17,)'
compile project(':ribbon-eureka')
compile 'org.powermock:powermock-easymock-release-full:1.5.4'
compile 'org.easymock:easymock:3.2'
compile 'com.netflix.rxjava:rxjava-core:0.18+'
compile 'com.sun.jersey:jersey-server:1.11'
compile 'javax.ws.rs:jsr311-api:1.1.1'
compile 'com.sun.jersey:jersey-core:1.11'
......@@ -143,6 +146,7 @@ project(':ribbon') {
compile 'com.netflix.hystrix:hystrix-core:1.4.0-RC4'
compile 'com.netflix.evcache:evcache-client:1.0.5'
compile project(':ribbon-transport')
testCompile project(':ribbon-test')
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
}
......@@ -150,4 +154,4 @@ project(':ribbon') {
task wrapper(type: Wrapper) {
gradleVersion = '1.12'
}
\ No newline at end of file
}
......@@ -345,7 +345,7 @@ public interface IClientConfig {
config.set(CommonClientConfigKey.ServerListRefreshInterval, value);
return this;
}
public Builder withZoneAffinityEnabled(boolean value) {
config.set(CommonClientConfigKey.EnableZoneAffinity, value);
return this;
......
......@@ -17,8 +17,20 @@
*/
package com.netflix.loadbalancer;
import java.util.Date;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -29,19 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.client.ClientFactory;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.config.DynamicIntProperty;
import com.netflix.servo.annotations.DataSourceType;
import com.netflix.servo.annotations.Monitor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A LoadBalancer that has the capabilities to obtain the candidate list of
* servers using a dynamic source. i.e. The list of servers can potentially be
......@@ -171,7 +170,7 @@ public class DynamicServerListLoadBalancer<T extends Server> extends
.primeConnections(getServerList(true));
}
this.setEnablePrimingConnections(primeConnection);
LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
}
......
......@@ -7,6 +7,7 @@ import com.netflix.client.RetryHandler;
import com.netflix.client.config.CommonClientConfigKey;
import com.netflix.client.config.DefaultClientConfigImpl;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
public class LoadBalancerBuilder<T extends Server> {
......@@ -57,35 +58,61 @@ public class LoadBalancerBuilder<T extends Server> {
public BaseLoadBalancer buildFixedServerListLoadBalancer(List<T> servers) {
if (rule == null) {
rule = createDefaultRule(config);
rule = createRuleFromConfig(config);
}
BaseLoadBalancer lb = new BaseLoadBalancer(config, rule, ping);
lb.setServersList(servers);
return lb;
}
private static IRule createDefaultRule(IClientConfig config) {
AvailabilityFilteringRule rule = new AvailabilityFilteringRule();
rule.initWithNiwsConfig(config);
private static IRule createRuleFromConfig(IClientConfig config) {
String ruleClassName = config.get(IClientConfigKey.Keys.NFLoadBalancerRuleClassName);
if (ruleClassName == null) {
throw new IllegalArgumentException("NFLoadBalancerRuleClassName is not specified in the config");
}
IRule rule;
try {
rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, config);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rule;
}
private static ServerList<Server> createDefaultServerList(IClientConfig config) {
ConfigurationBasedServerList list = new ConfigurationBasedServerList();
list.initWithNiwsConfig(config);
private static ServerList<Server> createServerListFromConfig(IClientConfig config) {
String serverListClassName = config.get(IClientConfigKey.Keys.NIWSServerListClassName);
if (serverListClassName == null) {
throw new IllegalArgumentException("NIWSServerListClassName is not specified in the config");
}
ServerList<Server> list;
try {
list = (ServerList<Server>) ClientFactory.instantiateInstanceWithClientConfig(serverListClassName, config);
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}
/**
* Build a {@link ZoneAwareLoadBalancer} with a dynamic {@link ServerList} and an {@link IRule}. The {@link ServerList} can be
* either set in the {@link #withDynamicServerList(ServerList)} or in the {@link IClientConfig} using {@link CommonClientConfigKey#NIWSServerListClassName}.
* The {@link IRule} can be either set by {@link #withRule(IRule)} or in the {@link IClientConfig} using
* {@link CommonClientConfigKey#NFLoadBalancerRuleClassName}.
*/
public ZoneAwareLoadBalancer<T> buildDynamicServerListLoadBalancer() {
if (serverListImpl == null) {
serverListImpl = createDefaultServerList(config);
serverListImpl = createServerListFromConfig(config);
}
if (rule == null) {
rule = createDefaultRule(config);
rule = createRuleFromConfig(config);
}
return new ZoneAwareLoadBalancer<T>(config, rule, ping, serverListImpl, serverListFilter);
}
/**
* Build a load balancer using the configuration from the {@link IClientConfig} only. It uses reflection to initialize necessary load balancer
* components.
*/
public ILoadBalancer buildLoadBalancerFromConfigWithReflection() {
String loadBalancerClassName = config.get(CommonClientConfigKey.NFLoadBalancerClassName);
if (loadBalancerClassName == null) {
......
package com.netflix.loadbalancer;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import com.netflix.client.ClientException;
import com.netflix.client.DefaultLoadBalancerRetryHandler;
import com.netflix.client.RetryHandler;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.utils.RxUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.observers.SafeSubscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.SafeSubscriber;
import rx.subscriptions.SerialSubscription;
import com.netflix.client.ClientException;
import com.netflix.client.DefaultLoadBalancerRetryHandler;
import com.netflix.client.RetryHandler;
import com.netflix.client.config.IClientConfig;
import com.netflix.servo.monitor.Stopwatch;
import com.netflix.utils.RxUtils;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Provides APIs to execute and retry tasks on a server chosen by the associated load balancer.
......@@ -265,7 +260,7 @@ public class LoadBalancerExecutor extends LoadBalancerContext {
@Override
public void onError(Throwable e) {
logger.debug("Got error %s when executed on server %s", e, server);
logger.debug("Got error {} when executed on server {}", e, server);
recordStats(entity, e);
int maxRetries = errorHandler.getMaxRetriesOnSameServer();
boolean shouldRetry = maxRetries > 0 && errorHandler.isRetriableException(e, true);
......
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.ribbon.testutils;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.loadbalancer.Server;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
import static org.easymock.EasyMock.expect;
import static org.powermock.api.easymock.PowerMock.createMock;
import static org.powermock.api.easymock.PowerMock.replay;
@RunWith(PowerMockRunner.class)
@PrepareForTest( {DiscoveryManager.class, DiscoveryClient.class} )
@PowerMockIgnore({"javax.management.*", "com.sun.jersey.*", "com.sun.*", "org.apache.*", "weblogic.*", "com.netflix.config.*", "com.sun.jndi.dns.*",
"javax.naming.*", "com.netflix.logging.*", "javax.ws.*"})
@Ignore
public abstract class MockedDiscoveryServerListTest {
protected abstract List<Server> getMockServerList();
protected abstract String getVipAddress();
static List<InstanceInfo> getDummyInstanceInfo(String appName, List<Server> serverList){
List<InstanceInfo> list = new ArrayList<InstanceInfo>();
for (Server server: serverList) {
InstanceInfo info = InstanceInfo.Builder.newBuilder().setAppName(appName)
.setHostName(server.getHost())
.setPort(server.getPort())
.build();
list.add(info);
}
return list;
}
@Before
public void setupMock(){
List<InstanceInfo> instances = getDummyInstanceInfo("dummy", getMockServerList());
PowerMock.mockStatic(DiscoveryManager.class);
PowerMock.mockStatic(DiscoveryClient.class);
DiscoveryClient mockedDiscoveryClient = createMock(DiscoveryClient.class);
DiscoveryManager mockedDiscoveryManager = createMock(DiscoveryManager.class);
expect(DiscoveryClient.getZone((InstanceInfo) EasyMock.anyObject())).andReturn("dummyZone").anyTimes();
expect(DiscoveryManager.getInstance()).andReturn(mockedDiscoveryManager).anyTimes();
expect(mockedDiscoveryManager.getDiscoveryClient()).andReturn(mockedDiscoveryClient).anyTimes();
expect(mockedDiscoveryClient.getInstancesByVipAddress(getVipAddress(), false, null)).andReturn(instances).anyTimes();
replay(DiscoveryManager.class);
replay(DiscoveryClient.class);
replay(mockedDiscoveryManager);
replay(mockedDiscoveryClient);
}
}
......@@ -65,6 +65,7 @@ import com.netflix.client.ssl.ClientSslSocketFactoryException;
import com.netflix.client.ssl.URLSslContextFactory;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.LoadBalancerBuilder;
import com.netflix.loadbalancer.LoadBalancerExecutor;
import com.netflix.loadbalancer.LoadBalancerObservableCommand;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerStats;
......@@ -97,7 +98,7 @@ public class NettyHttpClient<I, O> extends LoadBalancingRxClientWithPoolOptions<
RetryHandler retryHandler,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> pipelineConfigurator,
ScheduledExecutorService poolCleanerScheduler) {
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildDynamicServerListLoadBalancer(),
this(LoadBalancerBuilder.newBuilder().withClientConfig(config).buildLoadBalancerFromConfigWithReflection(),
config, retryHandler, pipelineConfigurator, poolCleanerScheduler);
}
......@@ -299,6 +300,10 @@ public class NettyHttpClient<I, O> extends LoadBalancingRxClientWithPoolOptions<
return (HttpClientListener) listener;
}
LoadBalancerExecutor getLoadBalancerExecutor() {
return lbExecutor;
}
@Override
protected MetricEventsListener<? extends ClientMetricsEvent<?>> createListener(
String name) {
......
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.client.netty.http;
import com.google.common.collect.Lists;
import com.netflix.client.config.IClientConfig;
import com.netflix.client.config.IClientConfigKey;
import com.netflix.client.netty.RibbonTransport;
import com.netflix.loadbalancer.LoadBalancerExecutor;
import com.netflix.loadbalancer.Server;
import com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList;
import com.netflix.ribbon.testutils.MockedDiscoveryServerListTest;
import io.netty.buffer.ByteBuf;
import org.junit.Test;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class DiscoveryLoadBalancerTest extends MockedDiscoveryServerListTest {
@Override
protected List<Server> getMockServerList() {
return Lists.newArrayList(new Server("www.google.com", 80), new Server("www.microsoft.com", 80));
}
@Override
protected String getVipAddress() {
return "myvip";
}
@Test
public void testLoadBalancer() {
IClientConfig config = IClientConfig.Builder.newBuilder().withDefaultValues()
.withDeploymentContextBasedVipAddresses(getVipAddress()).build()
.set(IClientConfigKey.Keys.NIWSServerListClassName, DiscoveryEnabledNIWSServerList.class.getName());
NettyHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(config);
LoadBalancerExecutor lbExecutor = client.getLoadBalancerExecutor();
List<Server> serverList = lbExecutor.getLoadBalancer().getServerList(false);
assertEquals(getMockServerList(), serverList);
}
}
......@@ -179,7 +179,7 @@ public class NettyClientTest {
public void testPoolReuse() throws Exception {
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet(SERVICE_URI + "testAsync/person");
NettyHttpClient<ByteBuf, ByteBuf> observableClient = (NettyHttpClient<ByteBuf, ByteBuf>) RibbonTransport.newHttpClient(
IClientConfig.Builder.newBuilder()
IClientConfig.Builder.newBuilder().withDefaultValues()
.withMaxAutoRetries(1)
.withMaxAutoRetriesNextServer(1).build());
Observable<HttpClientResponse<ByteBuf>> response = observableClient.submit(request);
......
......@@ -15,11 +15,11 @@
*/
package com.netflix.ribbon;
import com.netflix.client.config.IClientConfigKey;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.netflix.client.config.IClientConfigKey;
/**
* API to construct Ribbon client options to be used by {@link ResourceGroup}
*
......@@ -97,7 +97,7 @@ public final class ClientOptions {
options.put(IClientConfigKey.Keys.InitializeNFLoadBalancer, value);
return this;
}
Map<IClientConfigKey<?>, Object> getOptions() {
return options;
}
......
......@@ -23,10 +23,25 @@ public final class Ribbon {
private Ribbon() {
}
/**
* Create the {@link com.netflix.ribbon.http.HttpResourceGroup} with a name. The underlying transport client
* will be created from the client configuration created via
* {@link com.netflix.client.config.IClientConfig.Builder#newBuilder(String)}
*
* @param name name of the resource group, as well as the transport client
*/
public static HttpResourceGroup createHttpResourceGroup(String name) {
return new HttpResourceGroup(name);
}
/**
* Create the {@link com.netflix.ribbon.http.HttpResourceGroup} with a name. The underlying transport client
* will be created from the client configuration created via
* {@link com.netflix.client.config.IClientConfig.Builder#newBuilder(String)}
*
* @param name name of the resource group, as well as the transport client
* @param options Options to override the client configuration created
*/
public static HttpResourceGroup createHttpResourceGroup(String name, ClientOptions options) {
return new HttpResourceGroup(name, options);
}
......
......@@ -38,7 +38,8 @@ public class HttpResourceGroup extends ResourceGroup<HttpRequestTemplate<?>> {
client = RibbonTransport.newHttpClient(getClientConfig());
headers = new DefaultHttpHeaders();
}
@Override
protected IClientConfig loadDefaultConfig(String groupName) {
return IClientConfig.Builder.newBuilder(groupName).build();
}
......
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.ribbon;
import com.google.mockwebserver.MockResponse;
import com.google.mockwebserver.MockWebServer;
import com.netflix.client.config.IClientConfigKey.Keys;
import com.netflix.config.ConfigurationManager;
import com.netflix.loadbalancer.Server;
import com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList;
import com.netflix.ribbon.http.HttpRequestTemplate;
import com.netflix.ribbon.http.HttpResourceGroup;
import com.netflix.ribbon.testutils.MockedDiscoveryServerListTest;
import io.netty.buffer.ByteBuf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* Created by awang on 7/15/14.
*/
public class DiscoveryEnabledServerListTest extends MockedDiscoveryServerListTest {
static MockWebServer server;
@BeforeClass
public static void init() throws IOException {
server = new MockWebServer();
String content = "Hello world";
MockResponse response = new MockResponse().setResponseCode(200).setHeader("Content-type", "text/plain")
.setBody(content);
server.enqueue(response);
server.play();
}
@AfterClass
public static void shutdown() {
try {
server.shutdown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected List<Server> getMockServerList() {
List<Server> servers = new ArrayList<Server>();
servers.add(new Server("localhost", 12345));
servers.add(new Server("localhost", server.getPort()));
return servers;
}
@Override
protected String getVipAddress() {
return "MyService";
}
@Test
public void testDynamicServers() {
ConfigurationManager.getConfigInstance().setProperty("MyService.ribbon." + Keys.DeploymentContextBasedVipAddresses, getVipAddress());
ConfigurationManager.getConfigInstance().setProperty("MyService.ribbon." + Keys.NIWSServerListClassName, DiscoveryEnabledNIWSServerList.class.getName());
HttpResourceGroup group = Ribbon.createHttpResourceGroup("MyService",
ClientOptions.create()
.withMaxAutoRetriesNextServer(3)
.withReadTimeout(300000));
HttpRequestTemplate<ByteBuf> template = group.newRequestTemplate("test", ByteBuf.class);
RibbonRequest<ByteBuf> request = template
.withUriTemplate("/")
.withMethod("GET")
.requestBuilder().build();
String result = request.execute().toString(Charset.defaultCharset());
assertEquals("Hello world", result);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册