未验证 提交 c4839ae8 编写于 作者: E elandau 提交者: GitHub

Merge pull request #406 from elandau/feature/decouple_archaius_config

config: Decouple dynamic config reloading from archaius
package com.netflix.client.config;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Base implementation of an IClientConfig with configuration that can be reloaded at runtime from an underlying
* property source while optimizing access to property values.
*
* Properties can either be scoped to a specific client or default properties that span all clients. By default
* properties follow the name convention `{clientname}.{namespace}.{key}` and then fallback to `{namespace}.{key}`
* if not found
*
* Internally the config tracks two maps, one for dynamic properties and one for code settable default values to use
* when a property is not defined in the underlying property source.
*/
public abstract class AbstractReloadableClientConfig implements IClientConfig {
private static final Logger LOG = LoggerFactory.getLogger(AbstractReloadableClientConfig.class);
private static final String DEFAULT_CLIENT_NAME = "";
private static final String DEFAULT_NAMESPACE = "ribbon";
// Map of raw property names (without namespace or client) to values set via code
private final Map<String, Object> defaultProperties = new HashMap<>();
// Map of all seen dynamic properties. This map will hold on properties requested with the exception of
// those returned from getGlobalProperty().
private final Map<IClientConfigKey, ReloadableProperty<?>> dynamicProperties = new ConcurrentHashMap<>();
// List of actions to perform when configuration changes. This includes both updating the Property instances
// as well as external consumers.
private final List<Runnable> changeActions = new CopyOnWriteArrayList<>();
private final AtomicLong refreshCounter = new AtomicLong();
private String clientName;
private String namespace = DEFAULT_NAMESPACE;
private boolean isDynamic;
public AbstractReloadableClientConfig() {
this(DEFAULT_CLIENT_NAME);
this.isDynamic = false;
}
public AbstractReloadableClientConfig(String clientName) {
this.clientName = clientName;
this.isDynamic = true;
}
/**
* Refresh all seen properties from the underlying property storage
*/
public final void reload() {
changeActions.forEach(Runnable::run);
cachedToString = null;
}
public void setClientName(String clientName){
this.clientName = clientName;
}
@Override
public final String getClientName() {
return clientName;
}
@Override
public String getNameSpace() {
return namespace;
}
@Override
public final void setNameSpace(String nameSpace) {
this.namespace = nameSpace;
}
@Override
public void loadProperties(String clientName) {
this.isDynamic = true;
this.clientName = clientName;
}
@Override
public void loadDefaultValues() {
isDynamic = true;
}
@Override
public final Map<String, Object> getProperties() {
Map<String, Object> result = new HashMap<>(dynamicProperties.size());
dynamicProperties.forEach((key, prop) ->
prop.getOptional().ifPresent(value -> result.put(key.key(), value.toString()))
);
LOG.info(result.toString());
return result;
}
/**
* Get a typed property value from the underlying storage mechanism.
* @param key
* @param type
* @param <T>
* @return Property value or null if not found
*/
protected abstract <T> Optional<T> loadProperty(String key, Class<T> type);
private <T> ReloadableProperty<T> createProperty(final Supplier<Optional<T>> valueSupplier, final Supplier<T> defaultValue, final boolean isDynamic) {
Preconditions.checkNotNull(valueSupplier, "defaultValueSupplier cannot be null");
return new ReloadableProperty<T>() {
private volatile Optional<T> value = Optional.empty();
{
refresh();
if (isDynamic) {
changeActions.add(this::refresh);
}
}
@Override
public void onChange(Consumer<T> consumer) {
final AtomicReference<Optional<T>> previous = new AtomicReference<>(getOptional());
changeActions.add(() -> {
Optional<T> current = getOptional();
if (!current.equals(Optional.ofNullable(previous.get()))) {
previous.set(current);
consumer.accept(current.orElseGet(defaultValue));
}
});
}
@Override
public T get() {
return value.orElseGet(defaultValue);
}
@Override
public Optional<T> getOptional() {
return value;
}
@Override
public void refresh() {
refreshCounter.incrementAndGet();
value = valueSupplier.get();
}
@Override
public String toString() {
return String.valueOf(get());
}
};
}
@Override
public final <T> T get(IClientConfigKey<T> key) {
return (T)Optional.ofNullable(getInternal(key)).flatMap(Property::getOptional).orElse(null);
}
public final <T> ReloadableProperty<T> getInternal(IClientConfigKey<T> key) {
return (ReloadableProperty<T>)dynamicProperties.computeIfAbsent(key, ignore -> getClientDynamicProperty(key, isDynamic));
}
@Override
public final <T> Property<T> getGlobalProperty(IClientConfigKey<T> key) {
LOG.debug("Get global property {} default {}", key.key(), key.defaultValue());
return createProperty(
() -> loadProperty(key.key(), key.type()),
key::defaultValue,
true);
}
interface ReloadableProperty<T> extends Property<T> {
void refresh();
}
private <T> ReloadableProperty<T> getClientDynamicProperty(IClientConfigKey<T> key, boolean isDynamic) {
LOG.debug("Get dynamic property key={} ns={} client={}", key.key(), getNameSpace(), clientName);
return createProperty(
() -> resolveFinalProperty(key),
key::defaultValue,
isDynamic);
}
/**
* Resolve a properties final value in the following order or precedence
* - client scope
* - default scope
* - internally set default
* - IClientConfigKey defaultValue
* @param key
* @param <T>
* @return
*/
private <T> Optional<T> resolveFinalProperty(IClientConfigKey<T> key) {
Optional<T> value;
if (!StringUtils.isEmpty(clientName)) {
value = loadProperty(clientName + "." + getNameSpace() + "." + key.key(), key.type());
if (value.isPresent()) {
return value;
}
}
value = loadProperty(getNameSpace() + "." + key.key(), key.type());
if (value.isPresent()) {
return value;
}
value = resolveDefaultProperty(key);
if (value.isPresent()) {
return value;
}
return Optional.empty();
}
/**
* Returns the internal property to the desiredn type
*/
protected <T> Optional<T> resolveDefaultProperty(IClientConfigKey<T> key) {
return Optional.ofNullable(defaultProperties.get(key.key()))
.map(value -> {
final Class type = key.type();
// Unfortunately there's some legacy code setting string values for typed keys. Here are do our best to parse
// and store the typed value
if (!value.getClass().equals(type)) {
try {
if (value.getClass().equals(String.class)) {
final String strValue = (String) value;
if (Integer.class.equals(type)) {
return (T) Integer.valueOf(strValue);
} else if (Boolean.class.equals(type)) {
return (T) Boolean.valueOf(strValue);
} else if (Float.class.equals(type)) {
return (T) Float.valueOf(strValue);
} else if (Long.class.equals(type)) {
return (T) Long.valueOf(strValue);
} else if (Double.class.equals(type)) {
return (T) Double.valueOf(strValue);
} else if (TimeUnit.class.equals(type)) {
return (T) TimeUnit.valueOf(strValue);
} else {
throw new IllegalArgumentException("Unsupported value type `" + type + "'");
}
} else {
throw new IllegalArgumentException("Incompatible value type `" + value.getClass() + "` while expecting '" + type + "`");
}
} catch (Exception e) {
throw new IllegalArgumentException("Error parsing value '" + value + "' for '" + key.key() + "'", e);
}
} else {
return (T)value;
}
});
}
@Override
public final <T> Property<T> getDynamicProperty(IClientConfigKey<T> key) {
return getClientDynamicProperty(key, true);
}
@Override
public final <T> T get(IClientConfigKey<T> key, T defaultValue) {
return Optional.ofNullable(get(key)).orElse(defaultValue);
}
@Override
public final <T> IClientConfig set(IClientConfigKey<T> key, T value) {
Preconditions.checkArgument(key != null, "key cannot be null");
// Treat nulls as deletes
if (value == null) {
defaultProperties.remove(key.key());
} else {
defaultProperties.put(key.key(), value);
}
getInternal(key).refresh();
cachedToString = null;
return this;
}
@Override
@Deprecated
public void setProperty(IClientConfigKey key, Object value) {
Preconditions.checkArgument(value != null, "Value may not be null");
set(key, value);
}
@Override
@Deprecated
public Object getProperty(IClientConfigKey key) {
return getInternal(key).get();
}
@Override
@Deprecated
public Object getProperty(IClientConfigKey key, Object defaultVal) {
return Optional.ofNullable(getInternal(key).get()).orElse(defaultVal);
}
@Override
@Deprecated
public boolean containsProperty(IClientConfigKey key) {
return dynamicProperties.containsKey(key.key());
}
@Override
@Deprecated
public int getPropertyAsInteger(IClientConfigKey key, int defaultValue) {
return Optional.ofNullable(getProperty(key)).map(Integer.class::cast).orElse(defaultValue);
}
@Override
@Deprecated
public String getPropertyAsString(IClientConfigKey key, String defaultValue) {
return Optional.ofNullable(getProperty(key)).map(Object::toString).orElse(defaultValue);
}
@Override
@Deprecated
public boolean getPropertyAsBoolean(IClientConfigKey key, boolean defaultValue) {
return Optional.ofNullable(getProperty(key)).map(Boolean.class::cast).orElse(defaultValue);
}
public IClientConfig applyOverride(IClientConfig override) {
if (override == null) {
return this;
}
this.defaultProperties.putAll(override.getProperties());
reload();
return this;
}
private volatile String cachedToString = null;
@Override
public String toString() {
if (cachedToString == null) {
String newToString = generateToString();
cachedToString = newToString;
return newToString;
}
return cachedToString;
}
/**
* @return Number of individual properties refreshed. This can be used to identify patterns of excessive updates.
*/
public long getRefreshCount() {
return refreshCounter.get();
}
private String generateToString() {
return "ClientConfig:" + dynamicProperties.entrySet().stream()
.map((t) -> {
if (t.getKey().key().endsWith("Password")) {
return t.getKey() + ":***";
}
Object value = t.getValue().get();
Object defaultValue = t.getKey().defaultValue();
return t.getKey() + ":" + MoreObjects.firstNonNull(value, defaultValue);
})
.collect(Collectors.joining(", "));
}
}
......@@ -21,4 +21,9 @@ public final class FallbackProperty<T> implements Property<T> {
public T get() {
return primary.getOptional().orElseGet(fallback::get);
}
@Override
public String toString() {
return String.valueOf(get());
}
}
......@@ -15,10 +15,13 @@ public interface Property<T> {
void onChange(Consumer<T> consumer);
/**
* @return Get the current value or default value
* @return Get the current value. Can be null if no default value was defined
*/
T get();
/**
* @return Return the value only if not set. Will return Optional.empty() instead of default value if not set
*/
default Optional<T> getOptional() { return Optional.ofNullable(get()); }
default Property<T> fallbackWith(Property<T> fallback) {
......@@ -29,13 +32,18 @@ public interface Property<T> {
return new Property<T>() {
@Override
public void onChange(Consumer<T> consumer) {
// It's a static property so no need to track the consumer
}
@Override
public T get() {
return value;
}
@Override
public String toString( ){
return String.valueOf(value);
}
};
}
}
......@@ -71,14 +71,11 @@ public class ClientConfigTest {
props.setProperty("netflix.appinfo.stack","xbox");
props.setProperty("netflix.environment","test");
props.setProperty("appname", "movieservice");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.AppName.key(), "movieservice");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.DeploymentContextBasedVipAddresses.key(),
"${appname}-${netflix.appinfo.stack}-${netflix.environment},movieservice--${netflix.environment}");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.EnableZoneAffinity.key(), "false");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.AppName.key(), "movieservice");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.DeploymentContextBasedVipAddresses.key(), "${appname}-${netflix.appinfo.stack}-${netflix.environment},movieservice--${netflix.environment}");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.EnableZoneAffinity.key(), "false");
ConfigurationManager.loadProperties(props);
ConfigurationManager.getConfigInstance().setProperty("testRestClient.ribbon.customProperty", "abc");
......@@ -94,32 +91,29 @@ public class ClientConfigTest {
Assert.assertEquals(5000, clientConfig.get(CommonClientConfigKey.ConnectTimeout).longValue());
Assert.assertEquals(8000, clientConfig.get(CommonClientConfigKey.Port).longValue());
assertEquals("abc", clientConfig.getProperties().get("customProperty"));
System.out.println("AutoVipAddress:" + clientConfig.resolveDeploymentContextbasedVipAddresses());
ConfigurationManager.getConfigInstance().setProperty("testRestClient.ribbon.EnableZoneAffinity", "true");
ConfigurationManager.getConfigInstance().setProperty("testRestClient.ribbon.customProperty", "xyz");
assertEquals(true, clientConfig.get(CommonClientConfigKey.EnableZoneAffinity));
assertEquals("xyz", clientConfig.getProperties().get("customProperty"));
}
@Test
public void testresolveDeploymentContextbasedVipAddresses() throws Exception {
final String restClientName = "testRestClient2";
DefaultClientConfigImpl clientConfig = new DefaultClientConfigImpl();
clientConfig.loadDefaultValues();
Properties props = new Properties();
final String restClientName = "testRestClient2";
clientConfig.setProperty(props, restClientName,CommonClientConfigKey.AppName.key(), "movieservice");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.DeploymentContextBasedVipAddresses.key(),
"${<appname>}-${netflix.appinfo.stack}-${netflix.environment}:${<port>},${<appname>}--${netflix.environment}:${<port>}");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.Port.key(), "7001");
clientConfig.setProperty(props, restClientName, CommonClientConfigKey.EnableZoneAffinity.key(), "true");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.AppName.key(), "movieservice");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.DeploymentContextBasedVipAddresses.key(), "${<appname>}-${netflix.appinfo.stack}-${netflix.environment}:${<port>},${<appname>}--${netflix.environment}:${<port>}");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.Port.key(), "7001");
props.setProperty(restClientName + ".ribbon." + CommonClientConfigKey.EnableZoneAffinity.key(), "true");
ConfigurationManager.loadProperties(props);
clientConfig.loadProperties(restClientName);
Assert.assertEquals("movieservice", clientConfig.get(CommonClientConfigKey.AppName));
Assert.assertEquals(true, clientConfig.get(CommonClientConfigKey.EnableZoneAffinity));
......@@ -127,7 +121,7 @@ public class ClientConfigTest {
assertEquals("movieservice-xbox-test:7001", clientConfig.get(CommonClientConfigKey.DeploymentContextBasedVipAddresses));
ConfigurationManager.getConfigInstance().clearProperty("testRestClient2.ribbon.EnableZoneAffinity");
assertNull(clientConfig.get(CommonClientConfigKey.EnableZoneAffinity));
assertFalse(clientConfig.get(CommonClientConfigKey.EnableZoneAffinity));
}
@Test
......
......@@ -27,19 +27,6 @@ public class DefaultClientConfigImplTest {
assertEquals(1000, config.get(CommonClientConfigKey.ConnectTimeout).intValue());
}
@Test
public void shouldThrowExceptionForIncorrectProperties() {
ConfigurationManager.getConfigInstance().setProperty("myclient.ribbon", "bar");
DefaultClientConfigImpl config = new DefaultClientConfigImpl();
String message = "";
try {
config.loadProperties("myclient");
} catch (RuntimeException ex) {
message = ex.getMessage();
}
assertEquals("Property ribbon is invalid", message);
}
@Test
public void testNewType() {
CommonClientConfigKey<Date> key = new CommonClientConfigKey<Date>("date") {};
......
......@@ -64,4 +64,9 @@ public class ConfigurationBasedServerList extends AbstractServerList<Server> {
}
return list;
}
@Override
public String toString() {
return "ConfigurationBasedServerList:" + getUpdatedListOfServers();
}
}
......@@ -61,8 +61,8 @@ public class ExecutionContext<T> {
this.request = request;
this.requestConfig = requestConfig;
this.clientConfig = clientConfig;
this.context = new ConcurrentHashMap<String, Object>();
this.subContexts = new ConcurrentHashMap<Object, ChildContext<T>>();
this.context = new ConcurrentHashMap<>();
this.subContexts = new ConcurrentHashMap<>();
this.retryHandler = retryHandler;
}
......@@ -70,7 +70,7 @@ public class ExecutionContext<T> {
this.request = request;
this.requestConfig = requestConfig;
this.clientConfig = clientConfig;
this.context = new ConcurrentHashMap<String, Object>();
this.context = new ConcurrentHashMap<>();
this.subContexts = subContexts;
this.retryHandler = retryHandler;
}
......
......@@ -10,6 +10,7 @@ dependencies {
compile "org.slf4j:slf4j-api:${slf4j_version}"
compile "com.google.guava:guava:${guava_version}"
testCompile 'junit:junit:4.11'
testCompile "org.slf4j:slf4j-log4j12:${slf4j_version}"
testCompile "com.sun.jersey:jersey-server:${jersey_version}"
testCompile 'com.google.mockwebserver:mockwebserver:20130706'
testCompile project(':ribbon-eureka')
......
......@@ -36,10 +36,12 @@ import com.netflix.ribbon.transport.netty.RibbonTransport;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.junit.Ignore;
import org.junit.Test;
import rx.functions.Action1;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
......@@ -54,9 +56,13 @@ public class ListenerTest {
@Test
public void testFailedExecution() {
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues().withProperty(CommonClientConfigKey.ConnectTimeout, "100")
.withProperty(CommonClientConfigKey.MaxAutoRetries, 1)
.withProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 1);
IClientConfig config = DefaultClientConfigImpl.getClientConfigWithDefaultValues()
.withProperty(CommonClientConfigKey.ConnectTimeout, "100")
.withProperty(CommonClientConfigKey.MaxAutoRetries, 1)
.withProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 1);
System.out.println(config);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/person");
Server badServer = new Server("localhost:12345");
Server badServer2 = new Server("localhost:34567");
......@@ -128,7 +134,9 @@ public class ListenerTest {
public void testSuccessExecution() throws IOException {
MockWebServer server = new MockWebServer();
String content = "OK";
server.enqueue(new MockResponse().setResponseCode(200).setHeader("Content-type", "application/json")
server.enqueue(new MockResponse()
.setResponseCode(200)
.setHeader("Content-type", "application/json")
.setBody(content));
server.play();
......@@ -137,21 +145,30 @@ public class ListenerTest {
.withProperty(CommonClientConfigKey.ConnectTimeout, "2000")
.withProperty(CommonClientConfigKey.MaxAutoRetries, 1)
.withProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, 1);
System.out.println(config);
HttpClientRequest<ByteBuf> request = HttpClientRequest.createGet("/testAsync/person");
Server badServer = new Server("localhost:12345");
Server goodServer = new Server("localhost:" + server.getPort());
List<Server> servers = Lists.newArrayList(goodServer, badServer);
BaseLoadBalancer lb = LoadBalancerBuilder.<Server>newBuilder()
BaseLoadBalancer lb = LoadBalancerBuilder.newBuilder()
.withRule(new AvailabilityFilteringRule())
.withPing(new DummyPing())
.buildFixedServerListLoadBalancer(servers);
IClientConfig overrideConfig = DefaultClientConfigImpl.getEmptyConfig().set(CommonClientConfigKey.ConnectTimeout, 500);
TestExecutionListener<ByteBuf, ByteBuf> listener = new TestExecutionListener<ByteBuf, ByteBuf>(request, overrideConfig);
List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = Lists.<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>>newArrayList(listener);
IClientConfig overrideConfig = DefaultClientConfigImpl
.getEmptyConfig()
.set(CommonClientConfigKey.ConnectTimeout, 500);
TestExecutionListener<ByteBuf, ByteBuf> listener = new TestExecutionListener<>(request, overrideConfig);
List<ExecutionListener<HttpClientRequest<ByteBuf>, HttpClientResponse<ByteBuf>>> listeners = Lists.newArrayList(listener);
LoadBalancingHttpClient<ByteBuf, ByteBuf> client = RibbonTransport.newHttpClient(lb, config, new NettyHttpLoadBalancerErrorHandler(config), listeners);
HttpClientResponse<ByteBuf> response = client.submit(request, null, overrideConfig).toBlocking().last();
System.out.println(listener);
assertEquals(200, response.getStatus().code());
assertEquals(1, listener.executionStartCounter.get());
assertEquals(3, listener.startWithServerCounter.get());
......
......@@ -137,4 +137,26 @@ public class TestExecutionListener<I, O> implements ExecutionListener<HttpClient
public ExecutionContext<HttpClientRequest<I>> getContext() {
return this.context;
}
@Override
public String toString() {
return "TestExecutionListener{" +
"executionStartCounter=" + executionStartCounter +
", startWithServerCounter=" + startWithServerCounter +
", exceptionWithServerCounter=" + exceptionWithServerCounter +
", executionFailedCounter=" + executionFailedCounter +
", executionSuccessCounter=" + executionSuccessCounter +
", expectedRequest=" + expectedRequest +
", requestConfig=" + requestConfig +
", checkContext=" + checkContext +
", checkExecutionInfo=" + checkExecutionInfo +
", finalThrowable=" + finalThrowable +
", response=" + response +
", errors=" + errors +
", numAttemptsOnServer=" + numAttemptsOnServer +
", numServers=" + numServers +
", lastServer=" + lastServer +
", context=" + context +
'}';
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册