未验证 提交 abcd87f8 编写于 作者: W wankai123 提交者: GitHub

Support collection type in dynamic configuration core and add zookeeper implementation (#7509)

* Support grouped dynamic configurations in DCS.
Support zookeeper grouped dynamic configurations.

* change to @Slf4j
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
上级 a2e011f9
......@@ -45,6 +45,8 @@ Release Notes.
* Add a new API to test log analysis language.
* Harden the security of Groovy-based DSL, MAL and LAL.
* Fix distinct in Service/Instance/Endpoint query is not working.
* Support collection type in dynamic configuration core.
* Support zookeeper grouped dynamic configurations.
* Fix NPE when OAP nodes synchronize events with each other in cluster mode.
#### UI
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -108,6 +109,11 @@ public class TraceLatencyThresholdsAndWatcherTest {
table.add(new ConfigTable.ConfigItem("agent-analyzer.default.slowTraceSegmentThreshold", "3000"));
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
return Optional.empty();
}
}
}
......@@ -22,6 +22,7 @@ import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -109,6 +110,11 @@ public class TraceSampleRateWatcherTest {
table.add(new ConfigTable.ConfigItem("agent-analyzer.default.sampleRate", "9000"));
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
return Optional.empty();
}
}
}
......@@ -31,11 +31,13 @@ public abstract class ConfigChangeWatcher {
private final String module;
private final ModuleProvider provider;
private final String itemName;
protected WatchType watchType;
public ConfigChangeWatcher(String module, ModuleProvider provider, String itemName) {
this.module = module;
this.provider = provider;
this.itemName = itemName;
this.watchType = WatchType.SINGLE;
}
/**
......@@ -70,4 +72,8 @@ public abstract class ConfigChangeWatcher {
public enum EventType {
ADD, MODIFY, DELETE
}
public enum WatchType {
SINGLE, GROUP
}
}
......@@ -25,7 +25,7 @@ import lombok.Setter;
import lombok.ToString;
/**
* ConfigTable contains all config.
* ConfigTable contains all WatchType.SINGLE config.
*/
@ToString
public class ConfigTable {
......
......@@ -25,18 +25,18 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The default implementor of Config Watcher register.
*/
@Slf4j
public abstract class ConfigWatcherRegister implements DynamicConfigurationService {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigWatcherRegister.class);
public static final String LINE_SEPARATOR = System.getProperty("line.separator", "\n");
private Register register = new Register();
private Register singleConfigChangeWatcherRegister = new Register();
@Getter
private Register groupConfigChangeWatcherRegister = new Register();
private volatile boolean isStarted = false;
private final long syncPeriod;
......@@ -55,65 +55,159 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi
}
WatcherHolder holder = new WatcherHolder(watcher);
if (register.containsKey(holder.getKey())) {
if (singleConfigChangeWatcherRegister.containsKey(
holder.getKey()) || groupConfigChangeWatcherRegister.containsKey(holder.getKey())) {
throw new IllegalStateException("Duplicate register, watcher=" + watcher);
}
register.put(holder.getKey(), holder);
switch (holder.getWatcher().getWatchType()) {
case SINGLE:
singleConfigChangeWatcherRegister.put(holder.getKey(), holder);
break;
case GROUP:
groupConfigChangeWatcherRegister.put(holder.getKey(), holder);
break;
default:
throw new IllegalArgumentException(
"Unexpected watch type of ConfigChangeWatcher " + watcher.toString());
}
}
public void start() {
isStarted = true;
LOGGER.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + register.toString());
log.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + singleConfigChangeWatcherRegister.toString());
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this::configSync,
t -> LOGGER.error("Sync config center error.", t)
t -> log.error("Sync config center error.", t)
), 0, syncPeriod, TimeUnit.SECONDS);
}
void configSync() {
Optional<ConfigTable> configTable = readConfig(register.keys());
singleConfigsSync();
groupConfigsSync();
}
private void singleConfigsSync() {
Optional<ConfigTable> configTable = readConfig(singleConfigChangeWatcherRegister.keys());
// Config table would be null if no change detected from the implementation.
configTable.ifPresent(config -> {
config.getItems().forEach(item -> {
String itemName = item.getName();
WatcherHolder holder = register.get(itemName);
if (holder != null) {
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
WatcherHolder holder = singleConfigChangeWatcherRegister.get(itemName);
if (holder == null) {
log.warn(
"Config {} from configuration center, doesn't match any WatchType.SINGLE watcher, ignore.",
itemName
);
return;
}
ConfigChangeWatcher watcher = holder.getWatcher();
String newItemValue = item.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
} else {
// Don't need to notify, stay in null.
}
} else {
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
});
if (log.isTraceEnabled()) {
log.trace(
"Current configurations after the sync." + LINE_SEPARATOR + singleConfigChangeWatcherRegister.toString());
}
});
}
private void groupConfigsSync() {
Optional<GroupConfigTable> groupConfigTable = readGroupConfig(groupConfigChangeWatcherRegister.keys());
// Config table would be null if no change detected from the implementation.
groupConfigTable.ifPresent(config -> {
config.getGroupItems().forEach(groupConfigItems -> {
String groupConfigItemName = groupConfigItems.getName();
WatcherHolder holder = groupConfigChangeWatcherRegister.get(groupConfigItemName);
if (holder == null) {
log.warn(
"Config {} from configuration center, doesn't match any WatchType.GROUP watcher, ignore.",
groupConfigItemName
);
return;
}
GroupConfigChangeWatcher watcher = (GroupConfigChangeWatcher) holder.getWatcher();
Map<String, ConfigTable.ConfigItem> groupItems = groupConfigItems.getItems();
Map<String, ConfigChangeWatcher.ConfigChangeEvent> changedGroupItems = new HashMap<>();
Map<String, String> currentGroupItems = Optional.ofNullable(watcher.groupItems())
.orElse(new HashMap<>());
groupItems.forEach((groupItemName, groupItem) -> {
String newItemValue = groupItem.getValue();
if (newItemValue == null) {
if (watcher.value() != null) {
if (currentGroupItems.get(groupItemName) != null) {
// Notify watcher, the new value is null with delete event type.
watcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE));
changedGroupItems.put(groupItemName, new ConfigChangeWatcher.ConfigChangeEvent(
null,
ConfigChangeWatcher.EventType.DELETE
));
} else {
// Don't need to notify, stay in null.
}
} else {
if (!newItemValue.equals(watcher.value())) {
watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
} else { //add and modify
if (!newItemValue.equals(currentGroupItems.get(groupItemName))) {
changedGroupItems.put(groupItemName, new ConfigChangeWatcher.ConfigChangeEvent(
newItemValue,
ConfigChangeWatcher.EventType.MODIFY
));
} else {
// Don't need to notify, stay in the same config value.
}
}
} else {
LOGGER.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName);
});
currentGroupItems.forEach((oldGroupItemName, oldGroupItemValue) -> {
//delete item
if (null == groupItems.get(oldGroupItemName)) {
// Notify watcher, the item is deleted with delete event type.
changedGroupItems.put(oldGroupItemName, new ConfigChangeWatcher.ConfigChangeEvent(
null,
ConfigChangeWatcher.EventType.DELETE
));
}
});
if (changedGroupItems.size() > 0) {
watcher.notifyGroup(changedGroupItems);
}
});
LOGGER.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString());
if (log.isTraceEnabled()) {
log.trace(
"Current configurations after the sync." + LINE_SEPARATOR + groupConfigChangeWatcherRegister.toString());
}
});
}
public abstract Optional<ConfigTable> readConfig(Set<String> keys);
public abstract Optional<GroupConfigTable> readGroupConfig(Set<String> keys);
public class Register {
private Map<String, WatcherHolder> register = new HashMap<>();
......@@ -145,23 +239,32 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi
.append(" module:")
.append(watcher.getModule())
.append(" provider:")
.append(watcher.getProvider().name())
.append(" value(current):")
.append(watcher.value())
.append(LINE_SEPARATOR);
.append(watcher.getProvider().name());
if (watcher.watchType.equals(ConfigChangeWatcher.WatchType.GROUP)) {
GroupConfigChangeWatcher groupWatcher = (GroupConfigChangeWatcher) watcher;
registerTableDescription.append(" groupItems(current):")
.append(groupWatcher.groupItems());
} else {
registerTableDescription.append(" value(current):")
.append(watcher.value());
}
registerTableDescription.append(LINE_SEPARATOR);
});
return registerTableDescription.toString();
}
}
@Getter
private class WatcherHolder {
protected class WatcherHolder {
private ConfigChangeWatcher watcher;
private final String key;
public WatcherHolder(ConfigChangeWatcher watcher) {
this.watcher = watcher;
this.key = String.join(".", watcher.getModule(), watcher.getProvider().name(), watcher.getItemName());
this.key = String.join(
".", watcher.getModule(), watcher.getProvider().name(),
watcher.getItemName()
);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.api;
import java.util.Map;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
public abstract class GroupConfigChangeWatcher extends ConfigChangeWatcher {
public GroupConfigChangeWatcher(final String module,
final ModuleProvider provider,
final String itemName) {
super(module, provider, itemName);
super.watchType = WatchType.GROUP;
}
@Override
public String value() {
throw new UnsupportedOperationException("Unsupported method value() in GroupConfigChangeWatcher");
}
@Override
public void notify(ConfigChangeEvent value) {
throw new UnsupportedOperationException("Unsupported method notify() in GroupConfigChangeWatcher");
}
/**
* @return current groupConfigs.
*/
public abstract Map<String, String> groupItems();
public abstract void notifyGroup(Map<String , ConfigChangeEvent> groupItems);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.api;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* ConfigTable contains all WatchType.GROUP config.
*/
@ToString
public class GroupConfigTable {
@Getter
private List<GroupConfigItems> groupItems = new ArrayList<>();
public void addGroupConfigItems(GroupConfigItems items) {
groupItems.add(items);
}
@Getter
@Setter
@ToString
public static class GroupConfigItems {
private String name;
private Map<String, ConfigTable.ConfigItem> items = new ConcurrentHashMap<>();
public GroupConfigItems(final String name) {
this.name = name;
}
public void add(ConfigTable.ConfigItem item) {
items.put(item.getName(), item);
}
}
}
......@@ -18,8 +18,10 @@
package org.apache.skywalking.oap.server.configuration.api;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
......@@ -65,6 +67,30 @@ public class ConfigWatcherRegisterTest {
Assert.assertEquals("abc2", newValue[0]);
}
@Test
public void testGroupConfInit() {
final Map<String, String> config = new ConcurrentHashMap<>();
register.registerConfigChangeWatcher(new GroupConfigChangeWatcher("MockModule", new MockProvider(), "groupItems1") {
@Override
public void notifyGroup(Map<String , ConfigChangeEvent> groupItems) {
groupItems.forEach((groupItemName , event) -> {
config.put(groupItemName, event.getNewValue());
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
});
register.configSync();
Assert.assertEquals("abc", config.get("item1"));
Assert.assertEquals("abc2", config.get("item2"));
}
@Test
public void testRegisterTableLog() {
register.registerConfigChangeWatcher(new ConfigChangeWatcher("MockModule", new MockProvider(), "prop2") {
......@@ -78,19 +104,34 @@ public class ConfigWatcherRegisterTest {
}
});
register.registerConfigChangeWatcher(new GroupConfigChangeWatcher("MockModule", new MockProvider(), "groupItems1") {
@Override
public Map<String, String> groupItems() {
return null;
}
@Override
public void notifyGroup(final Map<String, ConfigChangeEvent> groupItems) {
}
});
register.configSync();
ConfigWatcherRegister.Register registerTable = Whitebox.getInternalState(this.register, "register");
ConfigWatcherRegister.Register registerTable = Whitebox.getInternalState(this.register, "singleConfigChangeWatcherRegister");
ConfigWatcherRegister.Register groupRegisterTable = Whitebox.getInternalState(this.register, "groupConfigChangeWatcherRegister");
String expected = "Following dynamic config items are available." + ConfigWatcherRegister.LINE_SEPARATOR + "---------------------------------------------" + ConfigWatcherRegister.LINE_SEPARATOR + "key:MockModule.provider.prop2 module:MockModule provider:provider value(current):null" + ConfigWatcherRegister.LINE_SEPARATOR;
String groupConfigExpected = "Following dynamic config items are available." + ConfigWatcherRegister.LINE_SEPARATOR + "---------------------------------------------" + ConfigWatcherRegister.LINE_SEPARATOR + "key:MockModule.provider.groupItems1 module:MockModule provider:provider groupItems(current):null" + ConfigWatcherRegister.LINE_SEPARATOR;
Assert.assertEquals(expected, registerTable.toString());
Assert.assertEquals(groupConfigExpected, groupRegisterTable.toString());
}
public static class MockConfigWatcherRegister extends ConfigWatcherRegister {
@Override
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("module.provider.prop1", "abc");
ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("MockModule.provider.prop1", "abc");
ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("MockModule.provider.prop2", "abc2");
ConfigTable table = new ConfigTable();
......@@ -98,6 +139,23 @@ public class ConfigWatcherRegisterTest {
table.add(item2);
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(Set<String> keys) {
ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("item1", "abc");
ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("item2", "abc2");
ConfigTable.ConfigItem item3 = new ConfigTable.ConfigItem("item3", "abc3");
GroupConfigTable.GroupConfigItems groupConfigItems1 = new GroupConfigTable.GroupConfigItems("MockModule.provider.groupItems1");
GroupConfigTable.GroupConfigItems groupConfigItems2 = new GroupConfigTable.GroupConfigItems("MockModule.provider.groupItems2");
groupConfigItems1.add(item1);
groupConfigItems1.add(item2);
groupConfigItems2.add(item3);
GroupConfigTable table = new GroupConfigTable();
table.addGroupConfigItems(groupConfigItems1);
table.addGroupConfigItems(groupConfigItems2);
return Optional.of(table);
}
}
public static class MockModule extends ModuleDefine {
......
......@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -64,4 +65,10 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
return Optional.of(configTable);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
}
......@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -94,6 +95,12 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
private void registerKeyListeners(final Set<String> keys) {
final Set<String> unregisterKeys = new HashSet<>(keys);
unregisterKeys.removeAll(cachesByKey.keySet());
......
......@@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
@Slf4j
public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
......@@ -74,4 +75,10 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
}
......@@ -24,6 +24,7 @@ import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
@Slf4j
public class ConfigmapConfigurationWatcherRegister extends ConfigWatcherRegister {
......@@ -50,4 +51,10 @@ public class ConfigmapConfigurationWatcherRegister extends ConfigWatcherRegister
return Optional.of(configTable);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
}
......@@ -34,6 +34,7 @@ import java.util.concurrent.Executor;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -89,6 +90,12 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister {
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
private void registerKeyListeners(final Set<String> keys) {
final String group = settings.getGroup();
......
......@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.configuration.zookeeper;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
......@@ -28,8 +29,11 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
@Slf4j
public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
private final CuratorFramework client;
private final PathChildrenCache childrenCache;
private final String prefix;
......@@ -37,7 +41,7 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
super(settings.getPeriod());
prefix = settings.getNameSpace() + "/";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(settings.getBaseSleepTimeMs(), settings.getMaxRetries());
CuratorFramework client = CuratorFrameworkFactory.newClient(settings.getHostPort(), retryPolicy);
this.client = CuratorFrameworkFactory.newClient(settings.getHostPort(), retryPolicy);
client.start();
this.childrenCache = new PathChildrenCache(client, settings.getNameSpace(), true);
this.childrenCache.start();
......@@ -52,4 +56,28 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
});
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
GroupConfigTable table = new GroupConfigTable();
keys.forEach(key -> {
GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
try {
client.getChildren().forPath(this.prefix + key).forEach(itemName -> {
byte[] data = null;
try {
data = client.getData().forPath(this.prefix + key + "/" + itemName);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
groupConfigItems.add(
new ConfigTable.ConfigItem(itemName, data == null ? null : new String(data)));
});
} catch (Exception e) {
log.error(e.getMessage(), e);
}
table.addGroupConfigItems(groupConfigItems);
});
return Optional.of(table);
}
}
......@@ -22,6 +22,7 @@ import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
......@@ -33,17 +34,15 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Slf4j
public class ITZookeeperConfigurationTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ITZookeeperConfigurationTest.class);
private final Yaml yaml = new Yaml();
private MockZookeeperConfigurationProvider provider;
......@@ -70,17 +69,16 @@ public class ITZookeeperConfigurationTest {
assertNull(provider.watcher.value());
String zkAddress = System.getProperty("zk.address");
LOGGER.info("zkAddress: " + zkAddress);
log.info("zkAddress: " + zkAddress);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
client.start();
LOGGER.info("per path: " + nameSpace + "/" + key);
log.info("per path: " + nameSpace + "/" + key);
assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key, "500".getBytes()) != null);
LOGGER.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key)));
log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key)));
for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) {
}
......@@ -93,6 +91,43 @@ public class ITZookeeperConfigurationTest {
assertNull(provider.watcher.value());
}
@Test(timeout = 20000)
public void shouldReadUpdated4GroupConfig() throws Exception {
String nameSpace = "/default";
String key = "test-module.default.testKeyGroup";
assertEquals("{}", provider.groupWatcher.groupItems().toString());
String zkAddress = System.getProperty("zk.address");
log.info("zkAddress: " + zkAddress);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy);
client.start();
log.info("per path: " + nameSpace + "/" + key);
assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key + "/item1", "100".getBytes()) != null);
assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key + "/item2", "200".getBytes()) != null);
log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key + "/item1")));
log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key + "/item2")));
for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
}
for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
}
assertTrue(client.delete().forPath(nameSpace + "/" + key + "/item1") == null);
assertTrue(client.delete().forPath(nameSpace + "/" + key + "/item2") == null);
for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
}
for (String v = provider.groupWatcher.groupItems().get("item2"); v != null; v = provider.groupWatcher.groupItems().get("item2")) {
}
assertNull(provider.groupWatcher.groupItems().get("item1"));
assertNull(provider.groupWatcher.groupItems().get("item2"));
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
......@@ -115,7 +150,7 @@ public class ITZookeeperConfigurationTest {
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
});
}
}
}
......@@ -18,20 +18,22 @@
package org.apache.skywalking.oap.server.configuration.zookeeper.it;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class MockZookeeperConfigurationProvider extends ModuleProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(MockZookeeperConfigurationProvider.class);
ConfigChangeWatcher watcher;
GroupConfigChangeWatcher groupWatcher;
@Override
public String name() {
......@@ -56,7 +58,7 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider {
@Override
public void notify(ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
......@@ -69,6 +71,27 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider {
return testValue;
}
};
groupWatcher = new GroupConfigChangeWatcher(MockZookeeperConfigurationModule.NAME, this, "testKeyGroup") {
private Map<String, String> config = new ConcurrentHashMap<>();
@Override
public void notifyGroup(Map<String , ConfigChangeEvent> groupItems) {
log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
groupItems.forEach((groupItemName , event) -> {
if (EventType.DELETE.equals(event.getEventType())) {
config.remove(groupItemName);
} else {
config.put(groupItemName, event.getNewValue());
}
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
};
}
@Override
......@@ -77,6 +100,11 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider {
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(groupWatcher);
}
@Override
......
......@@ -24,6 +24,7 @@ import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.apache.skywalking.oap.server.configuration.zookeeper.ZookeeperServerSettings;
public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
......@@ -44,4 +45,9 @@ public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister {
});
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
return Optional.empty();
}
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc;
......@@ -73,4 +74,10 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
}
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
}
}
......@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.Set;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.apache.skywalking.oap.server.core.CoreModuleProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -74,5 +75,10 @@ public class ApdexThresholdConfigTest {
table.add(new ConfigTable.ConfigItem("core.default.apdexThreshold", "default: 1000 \nfoo: 200"));
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
return Optional.empty();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册