diff --git a/CHANGES.md b/CHANGES.md index 3724d8ad060ce3ef91cbbaf87964f315a6bd2822..a933392d24bfdc04bfa0b40ee7bb9b174d5e92a0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -45,6 +45,8 @@ Release Notes. * Add a new API to test log analysis language. * Harden the security of Groovy-based DSL, MAL and LAL. * Fix distinct in Service/Instance/Endpoint query is not working. +* Support collection type in dynamic configuration core. +* Support zookeeper grouped dynamic configurations. * Fix NPE when OAP nodes synchronize events with each other in cluster mode. #### UI diff --git a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java index b552be909b2d2ca63c1fa86d272fed4e04c3d717..bb1e03c42c3d11a7a9447be14b4226cab49ec589 100644 --- a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java +++ b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceLatencyThresholdsAndWatcherTest.java @@ -24,6 +24,7 @@ import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -108,6 +109,11 @@ public class TraceLatencyThresholdsAndWatcherTest { table.add(new ConfigTable.ConfigItem("agent-analyzer.default.slowTraceSegmentThreshold", "3000")); return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + return Optional.empty(); + } } } diff --git a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java index 2a16e7801906c278d0408cfdf8fdf89e96de3e33..5e325b15f2fd285900c3d1e32f295668ee57eb1c 100644 --- a/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java +++ b/oap-server/analyzer/agent-analyzer/src/test/java/org/apache/skywalking/oap/server/analyzer/provider/trace/TraceSampleRateWatcherTest.java @@ -22,6 +22,7 @@ import org.apache.skywalking.oap.server.analyzer.provider.AnalyzerModuleProvider import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -109,6 +110,11 @@ public class TraceSampleRateWatcherTest { table.add(new ConfigTable.ConfigItem("agent-analyzer.default.sampleRate", "9000")); return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + return Optional.empty(); + } } } diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigChangeWatcher.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigChangeWatcher.java index 5925a2d99c96fd7404aec0f8016e055d22ec59c7..5d9da3be6d626a3717d3583e12dd437fcea62310 100644 --- a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigChangeWatcher.java +++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigChangeWatcher.java @@ -31,11 +31,13 @@ public abstract class ConfigChangeWatcher { private final String module; private final ModuleProvider provider; private final String itemName; + protected WatchType watchType; public ConfigChangeWatcher(String module, ModuleProvider provider, String itemName) { this.module = module; this.provider = provider; this.itemName = itemName; + this.watchType = WatchType.SINGLE; } /** @@ -70,4 +72,8 @@ public abstract class ConfigChangeWatcher { public enum EventType { ADD, MODIFY, DELETE } + + public enum WatchType { + SINGLE, GROUP + } } diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigTable.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigTable.java index 02d7364e833d2e20192f24d4e9a7cf790fed638c..54c56ac4d4ee1a86f16e07abff171cb22f645a7e 100644 --- a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigTable.java +++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigTable.java @@ -25,7 +25,7 @@ import lombok.Setter; import lombok.ToString; /** - * ConfigTable contains all config. + * ConfigTable contains all WatchType.SINGLE config. */ @ToString public class ConfigTable { diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java index 503ae15b511bc91d4e6ab8d30cb633ae380b9f33..b6809c7386645db8888981ad9a1ddaa2c6ef6bbe 100644 --- a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegister.java @@ -25,18 +25,18 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The default implementor of Config Watcher register. */ +@Slf4j public abstract class ConfigWatcherRegister implements DynamicConfigurationService { - private static final Logger LOGGER = LoggerFactory.getLogger(ConfigWatcherRegister.class); public static final String LINE_SEPARATOR = System.getProperty("line.separator", "\n"); - - private Register register = new Register(); + private Register singleConfigChangeWatcherRegister = new Register(); + @Getter + private Register groupConfigChangeWatcherRegister = new Register(); private volatile boolean isStarted = false; private final long syncPeriod; @@ -55,65 +55,159 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi } WatcherHolder holder = new WatcherHolder(watcher); - if (register.containsKey(holder.getKey())) { + if (singleConfigChangeWatcherRegister.containsKey( + holder.getKey()) || groupConfigChangeWatcherRegister.containsKey(holder.getKey())) { throw new IllegalStateException("Duplicate register, watcher=" + watcher); } - register.put(holder.getKey(), holder); + + switch (holder.getWatcher().getWatchType()) { + case SINGLE: + singleConfigChangeWatcherRegister.put(holder.getKey(), holder); + break; + case GROUP: + groupConfigChangeWatcherRegister.put(holder.getKey(), holder); + break; + default: + throw new IllegalArgumentException( + "Unexpected watch type of ConfigChangeWatcher " + watcher.toString()); + } } public void start() { isStarted = true; - LOGGER.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + register.toString()); + log.info("Current configurations after the bootstrap sync." + LINE_SEPARATOR + singleConfigChangeWatcherRegister.toString()); Executors.newSingleThreadScheduledExecutor() .scheduleAtFixedRate( new RunnableWithExceptionProtection( this::configSync, - t -> LOGGER.error("Sync config center error.", t) + t -> log.error("Sync config center error.", t) ), 0, syncPeriod, TimeUnit.SECONDS); } void configSync() { - Optional configTable = readConfig(register.keys()); + singleConfigsSync(); + groupConfigsSync(); + } + + private void singleConfigsSync() { + Optional configTable = readConfig(singleConfigChangeWatcherRegister.keys()); // Config table would be null if no change detected from the implementation. configTable.ifPresent(config -> { config.getItems().forEach(item -> { String itemName = item.getName(); - WatcherHolder holder = register.get(itemName); - if (holder != null) { - ConfigChangeWatcher watcher = holder.getWatcher(); - String newItemValue = item.getValue(); + WatcherHolder holder = singleConfigChangeWatcherRegister.get(itemName); + if (holder == null) { + log.warn( + "Config {} from configuration center, doesn't match any WatchType.SINGLE watcher, ignore.", + itemName + ); + return; + } + ConfigChangeWatcher watcher = holder.getWatcher(); + String newItemValue = item.getValue(); + if (newItemValue == null) { + if (watcher.value() != null) { + // Notify watcher, the new value is null with delete event type. + watcher.notify( + new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE)); + } else { + // Don't need to notify, stay in null. + } + } else { + if (!newItemValue.equals(watcher.value())) { + watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent( + newItemValue, + ConfigChangeWatcher.EventType.MODIFY + )); + } else { + // Don't need to notify, stay in the same config value. + } + } + }); + if (log.isTraceEnabled()) { + log.trace( + "Current configurations after the sync." + LINE_SEPARATOR + singleConfigChangeWatcherRegister.toString()); + } + }); + } + + private void groupConfigsSync() { + Optional groupConfigTable = readGroupConfig(groupConfigChangeWatcherRegister.keys()); + // Config table would be null if no change detected from the implementation. + groupConfigTable.ifPresent(config -> { + config.getGroupItems().forEach(groupConfigItems -> { + String groupConfigItemName = groupConfigItems.getName(); + WatcherHolder holder = groupConfigChangeWatcherRegister.get(groupConfigItemName); + + if (holder == null) { + log.warn( + "Config {} from configuration center, doesn't match any WatchType.GROUP watcher, ignore.", + groupConfigItemName + ); + return; + } + + GroupConfigChangeWatcher watcher = (GroupConfigChangeWatcher) holder.getWatcher(); + Map groupItems = groupConfigItems.getItems(); + Map changedGroupItems = new HashMap<>(); + Map currentGroupItems = Optional.ofNullable(watcher.groupItems()) + .orElse(new HashMap<>()); + + groupItems.forEach((groupItemName, groupItem) -> { + String newItemValue = groupItem.getValue(); if (newItemValue == null) { - if (watcher.value() != null) { + if (currentGroupItems.get(groupItemName) != null) { // Notify watcher, the new value is null with delete event type. - watcher.notify( - new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE)); + changedGroupItems.put(groupItemName, new ConfigChangeWatcher.ConfigChangeEvent( + null, + ConfigChangeWatcher.EventType.DELETE + )); + } else { // Don't need to notify, stay in null. } - } else { - if (!newItemValue.equals(watcher.value())) { - watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent( + } else { //add and modify + if (!newItemValue.equals(currentGroupItems.get(groupItemName))) { + changedGroupItems.put(groupItemName, new ConfigChangeWatcher.ConfigChangeEvent( newItemValue, ConfigChangeWatcher.EventType.MODIFY )); + } else { // Don't need to notify, stay in the same config value. } } - } else { - LOGGER.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName); + }); + + currentGroupItems.forEach((oldGroupItemName, oldGroupItemValue) -> { + //delete item + if (null == groupItems.get(oldGroupItemName)) { + // Notify watcher, the item is deleted with delete event type. + changedGroupItems.put(oldGroupItemName, new ConfigChangeWatcher.ConfigChangeEvent( + null, + ConfigChangeWatcher.EventType.DELETE + )); + } + }); + + if (changedGroupItems.size() > 0) { + watcher.notifyGroup(changedGroupItems); } }); - - LOGGER.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString()); + if (log.isTraceEnabled()) { + log.trace( + "Current configurations after the sync." + LINE_SEPARATOR + groupConfigChangeWatcherRegister.toString()); + } }); } public abstract Optional readConfig(Set keys); + public abstract Optional readGroupConfig(Set keys); + public class Register { private Map register = new HashMap<>(); @@ -145,23 +239,32 @@ public abstract class ConfigWatcherRegister implements DynamicConfigurationServi .append(" module:") .append(watcher.getModule()) .append(" provider:") - .append(watcher.getProvider().name()) - .append(" value(current):") - .append(watcher.value()) - .append(LINE_SEPARATOR); + .append(watcher.getProvider().name()); + if (watcher.watchType.equals(ConfigChangeWatcher.WatchType.GROUP)) { + GroupConfigChangeWatcher groupWatcher = (GroupConfigChangeWatcher) watcher; + registerTableDescription.append(" groupItems(current):") + .append(groupWatcher.groupItems()); + } else { + registerTableDescription.append(" value(current):") + .append(watcher.value()); + } + registerTableDescription.append(LINE_SEPARATOR); }); return registerTableDescription.toString(); } } @Getter - private class WatcherHolder { + protected class WatcherHolder { private ConfigChangeWatcher watcher; private final String key; public WatcherHolder(ConfigChangeWatcher watcher) { this.watcher = watcher; - this.key = String.join(".", watcher.getModule(), watcher.getProvider().name(), watcher.getItemName()); + this.key = String.join( + ".", watcher.getModule(), watcher.getProvider().name(), + watcher.getItemName() + ); } } } diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigChangeWatcher.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigChangeWatcher.java new file mode 100644 index 0000000000000000000000000000000000000000..7451a380b98b7252e0e98fc895a092465caf6780 --- /dev/null +++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigChangeWatcher.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.configuration.api; + +import java.util.Map; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; + +public abstract class GroupConfigChangeWatcher extends ConfigChangeWatcher { + public GroupConfigChangeWatcher(final String module, + final ModuleProvider provider, + final String itemName) { + super(module, provider, itemName); + super.watchType = WatchType.GROUP; + } + + @Override + public String value() { + throw new UnsupportedOperationException("Unsupported method value() in GroupConfigChangeWatcher"); + } + + @Override + public void notify(ConfigChangeEvent value) { + throw new UnsupportedOperationException("Unsupported method notify() in GroupConfigChangeWatcher"); + } + + /** + * @return current groupConfigs. + */ + public abstract Map groupItems(); + + public abstract void notifyGroup(Map groupItems); +} diff --git a/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigTable.java b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigTable.java new file mode 100644 index 0000000000000000000000000000000000000000..638f803b7ced96c5de60fc5d71243cb273a3cdf7 --- /dev/null +++ b/oap-server/server-configuration/configuration-api/src/main/java/org/apache/skywalking/oap/server/configuration/api/GroupConfigTable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.configuration.api; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * ConfigTable contains all WatchType.GROUP config. + */ +@ToString +public class GroupConfigTable { + @Getter + private List groupItems = new ArrayList<>(); + + public void addGroupConfigItems(GroupConfigItems items) { + groupItems.add(items); + } + + @Getter + @Setter + @ToString + public static class GroupConfigItems { + private String name; + private Map items = new ConcurrentHashMap<>(); + + public GroupConfigItems(final String name) { + this.name = name; + } + + public void add(ConfigTable.ConfigItem item) { + items.put(item.getName(), item); + } + } +} diff --git a/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java b/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java index 944ab3c2cd8717d8a3d71cc8542f331a2650db86..4397a320697c7476ceb91998357f1d4f50caeafc 100644 --- a/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java +++ b/oap-server/server-configuration/configuration-api/src/test/java/org/apache/skywalking/oap/server/configuration/api/ConfigWatcherRegisterTest.java @@ -18,8 +18,10 @@ package org.apache.skywalking.oap.server.configuration.api; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; @@ -65,6 +67,30 @@ public class ConfigWatcherRegisterTest { Assert.assertEquals("abc2", newValue[0]); } + @Test + public void testGroupConfInit() { + final Map config = new ConcurrentHashMap<>(); + + register.registerConfigChangeWatcher(new GroupConfigChangeWatcher("MockModule", new MockProvider(), "groupItems1") { + @Override + public void notifyGroup(Map groupItems) { + groupItems.forEach((groupItemName , event) -> { + config.put(groupItemName, event.getNewValue()); + }); + } + + @Override + public Map groupItems() { + return config; + } + }); + + register.configSync(); + + Assert.assertEquals("abc", config.get("item1")); + Assert.assertEquals("abc2", config.get("item2")); + } + @Test public void testRegisterTableLog() { register.registerConfigChangeWatcher(new ConfigChangeWatcher("MockModule", new MockProvider(), "prop2") { @@ -78,19 +104,34 @@ public class ConfigWatcherRegisterTest { } }); + register.registerConfigChangeWatcher(new GroupConfigChangeWatcher("MockModule", new MockProvider(), "groupItems1") { + @Override + public Map groupItems() { + return null; + } + + @Override + public void notifyGroup(final Map groupItems) { + + } + }); + register.configSync(); - ConfigWatcherRegister.Register registerTable = Whitebox.getInternalState(this.register, "register"); + ConfigWatcherRegister.Register registerTable = Whitebox.getInternalState(this.register, "singleConfigChangeWatcherRegister"); + ConfigWatcherRegister.Register groupRegisterTable = Whitebox.getInternalState(this.register, "groupConfigChangeWatcherRegister"); String expected = "Following dynamic config items are available." + ConfigWatcherRegister.LINE_SEPARATOR + "---------------------------------------------" + ConfigWatcherRegister.LINE_SEPARATOR + "key:MockModule.provider.prop2 module:MockModule provider:provider value(current):null" + ConfigWatcherRegister.LINE_SEPARATOR; + String groupConfigExpected = "Following dynamic config items are available." + ConfigWatcherRegister.LINE_SEPARATOR + "---------------------------------------------" + ConfigWatcherRegister.LINE_SEPARATOR + "key:MockModule.provider.groupItems1 module:MockModule provider:provider groupItems(current):null" + ConfigWatcherRegister.LINE_SEPARATOR; Assert.assertEquals(expected, registerTable.toString()); + Assert.assertEquals(groupConfigExpected, groupRegisterTable.toString()); } public static class MockConfigWatcherRegister extends ConfigWatcherRegister { @Override public Optional readConfig(Set keys) { - ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("module.provider.prop1", "abc"); + ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("MockModule.provider.prop1", "abc"); ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("MockModule.provider.prop2", "abc2"); ConfigTable table = new ConfigTable(); @@ -98,6 +139,23 @@ public class ConfigWatcherRegisterTest { table.add(item2); return Optional.of(table); } + + @Override + public Optional readGroupConfig(Set keys) { + ConfigTable.ConfigItem item1 = new ConfigTable.ConfigItem("item1", "abc"); + ConfigTable.ConfigItem item2 = new ConfigTable.ConfigItem("item2", "abc2"); + ConfigTable.ConfigItem item3 = new ConfigTable.ConfigItem("item3", "abc3"); + GroupConfigTable.GroupConfigItems groupConfigItems1 = new GroupConfigTable.GroupConfigItems("MockModule.provider.groupItems1"); + GroupConfigTable.GroupConfigItems groupConfigItems2 = new GroupConfigTable.GroupConfigItems("MockModule.provider.groupItems2"); + groupConfigItems1.add(item1); + groupConfigItems1.add(item2); + groupConfigItems2.add(item3); + + GroupConfigTable table = new GroupConfigTable(); + table.addGroupConfigItems(groupConfigItems1); + table.addGroupConfigItems(groupConfigItems2); + return Optional.of(table); + } } public static class MockModule extends ModuleDefine { diff --git a/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java b/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java index 60d9319c3b4c832c1dac165b61d4b43e57506e2a..fdc3656389a0d5f29f4771dd10f6b85c897c5db9 100644 --- a/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-apollo/src/main/java/org/apache/skywalking/oap/server/configuration/apollo/ApolloConfigWatcherRegister.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,4 +65,10 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister { return Optional.of(configTable); } + + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } } diff --git a/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java b/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java index 73c3d5592c8cc84c519c8e8977eb178eb35cebf3..a51aee1d43e11a8f0ee36dafcb07c365ca15032c 100644 --- a/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java +++ b/oap-server/server-configuration/configuration-consul/src/main/java/org/apache/skywalking/oap/server/configuration/consul/ConsulConfigurationWatcherRegister.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,6 +95,12 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister { return Optional.of(table); } + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } + private void registerKeyListeners(final Set keys) { final Set unregisterKeys = new HashSet<>(keys); unregisterKeys.removeAll(cachesByKey.keySet()); diff --git a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java index 44df608ceec22d8631dc7ae63d62200a6595330a..e0341c7cd4363791db6043ca0ad999c93b09fb2a 100644 --- a/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-etcd/src/main/java/org/apache/skywalking/oap/server/configuration/etcd/EtcdConfigWatcherRegister.java @@ -29,6 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; @Slf4j public class EtcdConfigWatcherRegister extends ConfigWatcherRegister { @@ -74,4 +75,10 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister { return Optional.of(table); } + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } + } diff --git a/oap-server/server-configuration/configuration-k8s-configmap/src/main/java/org/apache/skywalking/oap/server/configuration/configmap/ConfigmapConfigurationWatcherRegister.java b/oap-server/server-configuration/configuration-k8s-configmap/src/main/java/org/apache/skywalking/oap/server/configuration/configmap/ConfigmapConfigurationWatcherRegister.java index f3ca25c4bc733d19276a1d86e59428dc3a1c63e5..df2c2d0088e5a668d9223c752b2e85e2a6343ba4 100644 --- a/oap-server/server-configuration/configuration-k8s-configmap/src/main/java/org/apache/skywalking/oap/server/configuration/configmap/ConfigmapConfigurationWatcherRegister.java +++ b/oap-server/server-configuration/configuration-k8s-configmap/src/main/java/org/apache/skywalking/oap/server/configuration/configmap/ConfigmapConfigurationWatcherRegister.java @@ -24,6 +24,7 @@ import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; @Slf4j public class ConfigmapConfigurationWatcherRegister extends ConfigWatcherRegister { @@ -50,4 +51,10 @@ public class ConfigmapConfigurationWatcherRegister extends ConfigWatcherRegister return Optional.of(configTable); } + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } + } diff --git a/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java b/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java index e9761d04b1fbdb023c55257dd42b29927ed35575..47c741aefb56f3fa8806c97af7797a3dd2a1a995 100644 --- a/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-nacos/src/main/java/org/apache/skywalking/oap/server/configuration/nacos/NacosConfigWatcherRegister.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executor; import org.apache.skywalking.apm.util.StringUtil; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +90,12 @@ public class NacosConfigWatcherRegister extends ConfigWatcherRegister { return Optional.of(table); } + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } + private void registerKeyListeners(final Set keys) { final String group = settings.getGroup(); diff --git a/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java b/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java index b8f0d1cb2662e16cd0f2e78da943e91d7fe49208..104d6cc809ab3543a8bf02ff146509823e9cb8e5 100644 --- a/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-zookeeper/src/main/java/org/apache/skywalking/oap/server/configuration/zookeeper/ZookeeperConfigWatcherRegister.java @@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.configuration.zookeeper; import java.util.Optional; import java.util.Set; +import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -28,8 +29,11 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; +@Slf4j public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister { + private final CuratorFramework client; private final PathChildrenCache childrenCache; private final String prefix; @@ -37,7 +41,7 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister { super(settings.getPeriod()); prefix = settings.getNameSpace() + "/"; RetryPolicy retryPolicy = new ExponentialBackoffRetry(settings.getBaseSleepTimeMs(), settings.getMaxRetries()); - CuratorFramework client = CuratorFrameworkFactory.newClient(settings.getHostPort(), retryPolicy); + this.client = CuratorFrameworkFactory.newClient(settings.getHostPort(), retryPolicy); client.start(); this.childrenCache = new PathChildrenCache(client, settings.getNameSpace(), true); this.childrenCache.start(); @@ -52,4 +56,28 @@ public class ZookeeperConfigWatcherRegister extends ConfigWatcherRegister { }); return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + GroupConfigTable table = new GroupConfigTable(); + keys.forEach(key -> { + GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key); + try { + client.getChildren().forPath(this.prefix + key).forEach(itemName -> { + byte[] data = null; + try { + data = client.getData().forPath(this.prefix + key + "/" + itemName); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + groupConfigItems.add( + new ConfigTable.ConfigItem(itemName, data == null ? null : new String(data))); + }); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + table.addGroupConfigItems(groupConfigItems); + }); + return Optional.of(table); + } } diff --git a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/ITZookeeperConfigurationTest.java b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/ITZookeeperConfigurationTest.java index 76be999fc6e0fd1f6fec8f9e74f68a15f5efb121..4efb0b792596a28d4b443e6e8b329a910818d18c 100644 --- a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/ITZookeeperConfigurationTest.java +++ b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/ITZookeeperConfigurationTest.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.io.Reader; import java.util.Map; import java.util.Properties; +import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -33,17 +34,15 @@ import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.ResourceUtils; import org.junit.Before; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +@Slf4j public class ITZookeeperConfigurationTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ITZookeeperConfigurationTest.class); - private final Yaml yaml = new Yaml(); private MockZookeeperConfigurationProvider provider; @@ -70,17 +69,16 @@ public class ITZookeeperConfigurationTest { assertNull(provider.watcher.value()); String zkAddress = System.getProperty("zk.address"); - LOGGER.info("zkAddress: " + zkAddress); + log.info("zkAddress: " + zkAddress); RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); client.start(); - - LOGGER.info("per path: " + nameSpace + "/" + key); + log.info("per path: " + nameSpace + "/" + key); assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key, "500".getBytes()) != null); - LOGGER.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key))); + log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key))); for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) { } @@ -93,6 +91,43 @@ public class ITZookeeperConfigurationTest { assertNull(provider.watcher.value()); } + @Test(timeout = 20000) + public void shouldReadUpdated4GroupConfig() throws Exception { + String nameSpace = "/default"; + String key = "test-module.default.testKeyGroup"; + assertEquals("{}", provider.groupWatcher.groupItems().toString()); + + String zkAddress = System.getProperty("zk.address"); + log.info("zkAddress: " + zkAddress); + + RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); + CuratorFramework client = CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); + client.start(); + log.info("per path: " + nameSpace + "/" + key); + + assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key + "/item1", "100".getBytes()) != null); + assertTrue(client.create().creatingParentsIfNeeded().forPath(nameSpace + "/" + key + "/item2", "200".getBytes()) != null); + + log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key + "/item1"))); + log.info("data: " + new String(client.getData().forPath(nameSpace + "/" + key + "/item2"))); + + for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) { + } + for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) { + } + + assertTrue(client.delete().forPath(nameSpace + "/" + key + "/item1") == null); + assertTrue(client.delete().forPath(nameSpace + "/" + key + "/item2") == null); + + for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) { + } + for (String v = provider.groupWatcher.groupItems().get("item2"); v != null; v = provider.groupWatcher.groupItems().get("item2")) { + } + + assertNull(provider.groupWatcher.groupItems().get("item1")); + assertNull(provider.groupWatcher.groupItems().get("item2")); + } + @SuppressWarnings("unchecked") private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException { Reader applicationReader = ResourceUtils.read("application.yml"); @@ -115,7 +150,7 @@ public class ITZookeeperConfigurationTest { moduleConfiguration.addProviderConfiguration(name, properties); }); } - }); + }); } } } diff --git a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/MockZookeeperConfigurationProvider.java b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/MockZookeeperConfigurationProvider.java index 99427f68f93dfd6343e104c578bd47f773c93f40..2a9e59b2b2345017c2f85ae40586eeaa53fde9f4 100644 --- a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/MockZookeeperConfigurationProvider.java +++ b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/it/MockZookeeperConfigurationProvider.java @@ -18,20 +18,22 @@ package org.apache.skywalking.oap.server.configuration.zookeeper.it; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +@Slf4j public class MockZookeeperConfigurationProvider extends ModuleProvider { - private static final Logger LOGGER = LoggerFactory.getLogger(MockZookeeperConfigurationProvider.class); - ConfigChangeWatcher watcher; + GroupConfigChangeWatcher groupWatcher; @Override public String name() { @@ -56,7 +58,7 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider { @Override public void notify(ConfigChangeEvent value) { - LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value); + log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value); if (EventType.DELETE.equals(value.getEventType())) { testValue = null; } else { @@ -69,6 +71,27 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider { return testValue; } }; + + groupWatcher = new GroupConfigChangeWatcher(MockZookeeperConfigurationModule.NAME, this, "testKeyGroup") { + private Map config = new ConcurrentHashMap<>(); + + @Override + public void notifyGroup(Map groupItems) { + log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems); + groupItems.forEach((groupItemName , event) -> { + if (EventType.DELETE.equals(event.getEventType())) { + config.remove(groupItemName); + } else { + config.put(groupItemName, event.getNewValue()); + } + }); + } + + @Override + public Map groupItems() { + return config; + } + }; } @Override @@ -77,6 +100,11 @@ public class MockZookeeperConfigurationProvider extends ModuleProvider { .provider() .getService(DynamicConfigurationService.class) .registerConfigChangeWatcher(watcher); + + getManager().find(ConfigurationModule.NAME) + .provider() + .getService(DynamicConfigurationService.class) + .registerConfigChangeWatcher(groupWatcher); } @Override diff --git a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java index 3cf419787b3d28e3f38acad53c670ee4b8bcf22f..8145a660de5548b2f4c05d7f20e2b1e0e7e7d688 100644 --- a/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java +++ b/oap-server/server-configuration/configuration-zookeeper/src/test/java/org/apache/skywalking/oap/server/configuration/zookeeper/ut/MockZookeeperConfigWatcherRegister.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.apache.skywalking.oap.server.configuration.zookeeper.ZookeeperServerSettings; public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister { @@ -44,4 +45,9 @@ public class MockZookeeperConfigWatcherRegister extends ConfigWatcherRegister { }); return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + return Optional.empty(); + } } \ No newline at end of file diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java index d5adc67d572a2d5ca34f370d9abdbe199776b8cf..e3ad840f05b71510d9c07a0dbfac8fa251dd80d0 100644 --- a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java +++ b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java @@ -24,6 +24,7 @@ import java.util.Optional; import java.util.Set; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest; import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse; import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc; @@ -73,4 +74,10 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister { } return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + // TODO: implement readGroupConfig + return Optional.empty(); + } } diff --git a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java index 4ba736f32fe3299bcc20e78d27c5df77a289efd2..18afa118b5c52427b9e636a354cf45bba9f6fc7d 100644 --- a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java +++ b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/analysis/ApdexThresholdConfigTest.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.Set; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.apache.skywalking.oap.server.core.CoreModuleProvider; import org.junit.Test; import org.junit.runner.RunWith; @@ -74,5 +75,10 @@ public class ApdexThresholdConfigTest { table.add(new ConfigTable.ConfigItem("core.default.apdexThreshold", "default: 1000 \nfoo: 200")); return Optional.of(table); } + + @Override + public Optional readGroupConfig(final Set keys) { + return Optional.empty(); + } } } \ No newline at end of file