未验证 提交 e89493b2 编写于 作者: W wankai123 提交者: GitHub

Support etcd grouped dynamic configurations. (#7672)

上级 ec03cca3
......@@ -57,6 +57,7 @@ Release Notes.
* Fix `H2MetadataQueryDAO.searchService` doesn't support auto grouping.
* Rebuilt ElasticSearch client on top of their REST API.
* Fix ElasticSearch storage plugin doesn't work when hot reloading from `secretsManagementFile`.
* Support etcd grouped dynamic configurations.
#### UI
......
......@@ -241,7 +241,7 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | baseSleepTimeMs|The period of Zookeeper client between two retries (in milliseconds). |SW_CONFIG_ZK_BASE_SLEEP_TIME_MS|1000|
| - | - | maxRetries| The maximum retry time. |SW_CONFIG_ZK_MAX_RETRIES|3|
| - | - | period | The period of data sync (in seconds). | SW_CONFIG_ZK_PERIOD | 60 |
| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | localhost:2379 |
| - | etcd| endpoints | Hosts and ports for etcd cluster (separated by commas if multiple). | SW_CONFIG_ETCD_ENDPOINTS | http://localhost:2379 |
| - | - | namespace | Namespace for SkyWalking cluster. |SW_CONFIG_ETCD_NAMESPACE | /skywalking |
| - | - | authentication | Indicates whether there is authentication. | SW_CONFIG_ETCD_AUTHENTICATION | false |
| - | - | user | Etcd auth username. | SW_CONFIG_ETCD_USER | |
......
......@@ -7,11 +7,54 @@ configuration:
selector: ${SW_CONFIGURATION:etcd}
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
user: ${SW_CONFIG_ETCD_USER:}
password: ${SW_CONFIG_ETCD_password:}
```
**NOTE**: Only the v3 protocol is supported since 8.7.0.
\ No newline at end of file
**NOTE**: Only the v3 protocol is supported since 8.7.0.
## Config Storage
### Single Config
Single configs in etcd are key/value pairs:
| Key | Value |
|-----|-----|
| {namespace}/configKey | configVaule |
e.g. The config is:
```
{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
```
If `namespace = /skywalking` the config in etcd is:
| Key | Value |
|-----|-----|
| /skywalking/agent-analyzer.default.slowDBAccessThreshold | default:200,mongodb:50 |
| ... | ... |
### Group Config
Group config in etcd are key/value pairs as well and the key is composited by configKey and subItemKey with `/`.
| Key | Value |
|-----|-----|
| {namespace}/configKey/subItemkey1 | subItemValue1 |
| {namespace}/configKey/subItemkey2 | subItemValue2 |
| ... | ... |
e.g. The config is:
```
{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
|{productAPI-v1}:{value of productAPI-v1}
|{productAPI-v2}:{value of productAPI-v2}
```
If `namespace = /skywalking` the config in etcd is:
| Key | Value |
|-----|-----|
| /skywalking/core.default.endpoint-name-grouping-openapi/customerAPI-v1 | value of customerAPI-v1 |
| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v1 | value of productAPI-v1 |
| /skywalking/core.default.endpoint-name-grouping-openapi/productAPI-v2 | value of productAPI-v2 |
\ No newline at end of file
......@@ -21,8 +21,11 @@ import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
......@@ -77,8 +80,37 @@ public class EtcdConfigWatcherRegister extends ConfigWatcherRegister {
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
GroupConfigTable groupConfigTable = new GroupConfigTable();
keys.forEach(key -> {
GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
groupConfigTable.addGroupConfigItems(groupConfigItems);
String groupKey = key + "/";
GetOption option = GetOption.newBuilder()
.withPrefix(ByteSequence.from(groupKey, Charset.defaultCharset()))
.build();
try {
GetResponse response = client.get(ByteSequence.from(groupKey, Charset.defaultCharset()), option).get();
if (0 != response.getCount()) {
List<KeyValue> groupItemKeys = response.getKvs();
if (groupItemKeys != null) {
groupItemKeys.forEach(groupItem -> {
String groupItemKey = groupItem.getKey().toString(Charset.defaultCharset());
if (!groupKey.equals(groupItemKey)) {
String itemValue = groupItem.getValue().toString(Charset.defaultCharset());
String itemName = groupItemKey.substring(groupKey.length());
groupConfigItems.add(
new ConfigTable.ConfigItem(itemName, itemValue));
}
});
}
}
} catch (Exception exp) {
throw new EtcdConfigException("Failed to read configuration", exp);
}
});
return Optional.of(groupConfigTable);
}
}
......@@ -18,22 +18,22 @@
package org.apache.skywalking.oap.server.configuration.etcd;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class EtcdConfigurationTestProvider extends ModuleProvider {
private final static Logger LOGGER = LoggerFactory.getLogger(EtcdConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
GroupConfigChangeWatcher groupWatcher;
@Override
public String name() {
......@@ -52,13 +52,13 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
public void prepare() throws ServiceNotProvidedException {
watcher = new ConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
......@@ -71,18 +71,44 @@ public class EtcdConfigurationTestProvider extends ModuleProvider {
return testValue;
}
};
groupWatcher = new GroupConfigChangeWatcher(EtcdConfigurationTestModule.NAME, this, "testKeyGroup") {
private Map<String, String> config = new ConcurrentHashMap<>();
@Override
public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
groupItems.forEach((groupItemName, event) -> {
if (EventType.DELETE.equals(event.getEventType())) {
config.remove(groupItemName);
} else {
config.put(groupItemName, event.getNewValue());
}
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
};
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
public void start() throws ServiceNotProvidedException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(groupWatcher);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
......
......@@ -108,6 +108,56 @@ public class ITEtcdConfigurationTest {
assertNull(provider.watcher.value());
}
@Test(timeout = 20000)
public void shouldReadUpdated4Group() throws Exception {
assertEquals("{}", provider.groupWatcher.groupItems().toString());
KV client = Client.builder()
.endpoints("http://localhost:" + container.getMappedPort(2379))
.namespace(ByteSequence.from("/skywalking/", Charset.defaultCharset()))
.build()
.getKVClient();
client.put(
ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset()),
ByteSequence.from("100", Charset.defaultCharset())
).get();
client.put(
ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
ByteSequence.from("200", Charset.defaultCharset())
).get();
for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
TimeUnit.MILLISECONDS.sleep(200L);
}
for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
TimeUnit.MILLISECONDS.sleep(200L);
}
assertEquals("100", provider.groupWatcher.groupItems().get("item1"));
assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
//test remove item1
client.delete(ByteSequence.from("test-module.default.testKeyGroup/item1", Charset.defaultCharset())).get();
for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
log.info("value is : {}", provider.groupWatcher.groupItems().get("item1"));
TimeUnit.MILLISECONDS.sleep(200L);
}
assertNull(provider.groupWatcher.groupItems().get("item1"));
//test modify item2
client.put(
ByteSequence.from("test-module.default.testKeyGroup/item2", Charset.defaultCharset()),
ByteSequence.from("300", Charset.defaultCharset())
).get();
for (String v = provider.groupWatcher.groupItems().get("item2"); v.equals("200"); v = provider.groupWatcher.groupItems().get("item2")) {
log.info("value is : {}", provider.groupWatcher.groupItems().get("item2"));
TimeUnit.MILLISECONDS.sleep(200L);
}
assertEquals("300", provider.groupWatcher.groupItems().get("item2"));
}
@SuppressWarnings("unchecked")
private static void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
final Yaml yaml = new Yaml();
......
......@@ -432,7 +432,7 @@ configuration:
maxRetries: ${SW_CONFIG_ZK_MAX_RETRIES:3} # max number of times to retry
etcd:
period: ${SW_CONFIG_ETCD_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:localhost:2379}
endpoints: ${SW_CONFIG_ETCD_ENDPOINTS:http://localhost:2379}
namespace: ${SW_CONFIG_ETCD_NAMESPACE:/skywalking}
authentication: ${SW_CONFIG_ETCD_AUTHENTICATION:false}
user: ${SW_CONFIG_ETCD_USER:}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册