未验证 提交 be048d42 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support change detection mechanism in DCS (#4438)

* Add UUID to DCS
上级 85cc9c36
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.configuration.api;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
......@@ -67,41 +68,52 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi
logger.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + register.toString());
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this::configSync, t -> logger.error("Sync config center error.", t)), syncPeriod, syncPeriod, TimeUnit.SECONDS);
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::configSync,
t -> logger.error("Sync config center error.", t)
), syncPeriod, syncPeriod, TimeUnit.SECONDS);
}
void configSync() {
ConfigTable configTable = readConfig(register.keys());
configTable.getItems().forEach(item -> {
String itemName = item.getName();
WatcherHolder holder = register.get(itemName);
if (holder != null) {
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
Optional<ConfigTable> configTable = readConfig(register.keys());
// Config table would be null if no change detected from the implementation.
configTable.ifPresent(config -> {
config.getItems().forEach(item -> {
String itemName = item.getName();
WatcherHolder holder = register.get(itemName);
if (holder != null) {
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
} else {
// Don't need to notify, stay in null.
}
} else {
// Don't need to notify, stay in null.
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
} else {
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(newItemValue, ConfigChangeWatcher.EventType.MODIFY));
} else {
// Don't need to notify, stay in the same config value.
}
logger.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
}
} else {
logger.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
}
});
});
logger.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
logger.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
});
}
public abstract ConfigTable readConfig(Set<String> keys);
public abstract Optional<ConfigTable> readConfig(Set<String> keys);
public class Register {
private Map<String, WatcherHolder> register = new HashMap<>();
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.configuration.api;
import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
......@@ -88,14 +89,14 @@ public class ConfigWatcherRegisterTest {
public static class MockConfigWatcherRegister extends ConfigWatcherRegister {
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("module.provider.prop1", "abc");
ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("MockModule.provider.prop2", "abc2");
ConfigTable table = new ConfigTable();
table.add(item1);
table.add(item2);
return table;
return Optional.of(table);
}
}
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.configuration.apollo;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigService;
import com.google.common.base.Strings;
import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
......@@ -53,7 +54,7 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
final ConfigTable configTable = new ConfigTable();
for (final String name : keys) {
......@@ -61,6 +62,6 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
configTable.add(new ConfigTable.ConfigItem(name, value));
}
return configTable;
return Optional.of(configTable);
}
}
......@@ -76,7 +76,7 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
......@@ -91,7 +91,7 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
}
});
return table;
return Optional.of(table);
}
private void registerKeyListeners(final Set<String> keys) {
......
......@@ -65,7 +65,7 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
final ConfigTable table = new ConfigTable();
......@@ -81,7 +81,7 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
}
}
return table;
return Optional.of(table);
}
private void registerKeyListeners(final Set<String> keys) {
......
......@@ -121,7 +121,7 @@ public class EtcdConfigWatcherRegisterTest {
when(node1.getKey()).thenReturn("/skywalking/testKey");
when(node1.getValue()).thenReturn("testVal");
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2));
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2)).get();
assertEquals(2, configTable.getItems().size());
Map<String, String> kvs = new HashMap<>();
......
......@@ -60,7 +60,7 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
removeUninterestedKeys(keys);
registerKeyListeners(keys);
......@@ -77,7 +77,7 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
}
}
return table;
return Optional.of(table);
}
private void registerKeyListeners(final Set<String> keys) {
......
......@@ -52,7 +52,7 @@ public class NacosConfigWatcherRegisterTest {
Whitebox.setInternalState(mockRegister, "configService", mockConfigService);
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2));
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(testKey1, testKey2)).get();
assertEquals(2, configTable.getItems().size());
Map<String, String> kvs = new HashMap<>();
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.configuration.zookeeper;
import java.util.Optional;
import java.util.Set;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
......@@ -43,12 +44,12 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
keys.forEach(s -> {
ChildData data = this.childrenCache.getCurrentData(this.prefix + s);
table.add(new ConfigTable.ConfigItem(s, data == null ? null : new String(data.getData())));
});
return table;
return Optional.of(table);
}
}
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.configuration.zookeeper.ut;
import java.util.Optional;
import java.util.Set;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
......@@ -35,12 +36,12 @@ public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
keys.forEach(s -> {
ChildData data = this.childrenCache.getCurrentData(this.prefix + s);
table.add(new ConfigTable.ConfigItem(s, data == null ? null : new String(data.getData())));
});
return table;
return Optional.of(table);
}
}
\ No newline at end of file
......@@ -48,7 +48,7 @@ public class ZookeeperConfigWatcherRegisterTestCase {
Whitebox.setInternalState(mockRegister, "childrenCache", mockPathChildrenCache);
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(key));
final ConfigTable configTable = mockRegister.readConfig(Sets.newHashSet(key)).get();
assertEquals(1, configTable.getItems().size());
assertEquals(key, configTable.getItems().get(0).getName());
......
......@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.server.configuration.grpc;
import io.grpc.netty.NettyChannelBuilder;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
......@@ -33,22 +35,32 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
private RemoteEndpointSettings settings;
private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub;
private String uuid = null;
public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) {
super(settings.getPeriod());
this.settings = settings;
stub = ConfigurationServiceGrpc.newBlockingStub(NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort())
.usePlaintext()
.build());
stub = ConfigurationServiceGrpc.newBlockingStub(
NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort())
.usePlaintext()
.build());
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
try {
ConfigurationResponse response = stub.call(ConfigurationRequest.newBuilder()
.setClusterName(settings.getClusterName())
.build());
ConfigurationRequest.Builder builder = ConfigurationRequest.newBuilder()
.setClusterName(settings.getClusterName());
if (uuid != null) {
builder.setUuid(uuid);
}
ConfigurationResponse response = stub.call(builder.build());
String responseUuid = response.getUuid();
if (Objects.equals(uuid, responseUuid)) {
// If UUID matched, the config table is expected as empty.
return Optional.empty();
}
response.getConfigTableList().forEach(config -> {
final String name = config.getName();
if (keys.contains(name)) {
......@@ -58,6 +70,6 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
} catch (Exception e) {
logger.error("Remote config center [" + settings + "] is not available.", e);
}
return table;
return Optional.of(table);
}
}
......@@ -31,6 +31,8 @@ message ConfigurationRequest {
// in case the remote configuration center implementation support
// configuration management for multiple clusters.
string clusterName = 1;
// The config UUID response from the config server side.
string uuid = 2;
}
message ConfigurationResponse {
......@@ -42,6 +44,9 @@ message ConfigurationResponse {
// If the config center wants to set the value to NULL or empty,
// must set the name with empty value explicitly.
repeated Config configTable = 1;
// UUID is literal string represents the content of the config table.
// If config table is unchanged, then could response the same uuid, and config table is not required.
string uuid = 2;
}
message Config {
......
......@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.analysis;
import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
......@@ -68,10 +69,10 @@ public class ApdexThresholdConfigTest {
}
@Override
public ConfigTable readConfig(Set<String> keys) {
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
table.add(new ConfigTable.ConfigItem("core.default.apdexThreshold", "default: 1000 \nfoo: 200"));
return table;
return Optional.of(table);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册