未验证 提交 97f54114 编写于 作者: Z zifeihan 提交者: GitHub

Add the implementation of ConfigurationDiscovery on the OAP side. (#6220)

上级 eb4b1215
......@@ -69,6 +69,7 @@ Release Notes.
* Fix receiver analysis error count metrics
* Log collecting and query implementation
* Support Alarm to feishu
* Add the implementation of ConfigurationDiscovery on the OAP side.
#### UI
* Fix un-removed tags in trace query.
......
......@@ -25,6 +25,8 @@ public class CommandDeserializer {
final String commandName = command.getCommand();
if (ProfileTaskCommand.NAME.equals(commandName)) {
return ProfileTaskCommand.DESERIALIZER.deserialize(command);
} else if (ConfigurationDiscoveryCommand.NAME.equals(commandName)) {
return ConfigurationDiscoveryCommand.DESERIALIZER.deserialize(command);
}
throw new UnsupportedCommandException(command);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.network.trace.component.command;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.network.common.v3.Command;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
public class ConfigurationDiscoveryCommand extends BaseCommand implements Serializable, Deserializable<ConfigurationDiscoveryCommand> {
public static final Deserializable<ConfigurationDiscoveryCommand> DESERIALIZER = new ConfigurationDiscoveryCommand(
"", "", new ArrayList<>());
public static final String NAME = ConfigurationDiscoveryCommand.class.getSimpleName();
public static final String UUID_CONST_NAME = "UUID";
public static final String SERIAL_NUMBER_CONST_NAME = "SerialNumber";
/*
* If config is unchanged, then could response the same uuid, and config is not required.
*/
private String uuid;
/*
* The configuration of service.
*/
private List<KeyStringValuePair> config;
public ConfigurationDiscoveryCommand(String serialNumber,
String uuid,
List<KeyStringValuePair> config) {
super(NAME, serialNumber);
this.uuid = uuid;
this.config = config;
}
@Override
public ConfigurationDiscoveryCommand deserialize(Command command) {
String serialNumber = null;
String uuid = null;
List<KeyStringValuePair> config = new ArrayList<>();
for (final KeyStringValuePair pair : command.getArgsList()) {
if (SERIAL_NUMBER_CONST_NAME.equals(pair.getKey())) {
serialNumber = pair.getValue();
} else if (UUID_CONST_NAME.equals(pair.getKey())) {
uuid = pair.getValue();
} else {
config.add(pair);
}
}
return new ConfigurationDiscoveryCommand(serialNumber, uuid, config);
}
@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
builder.addArgs(KeyStringValuePair.newBuilder().setKey(UUID_CONST_NAME).setValue(uuid));
builder.addAllArgs(config);
return builder;
}
public String getUuid() {
return uuid;
}
public List<KeyStringValuePair> getConfig() {
return config;
}
@Override
public String toString() {
return "ConfigurationDiscoveryCommand{" +
"uuid='" + uuid + '\'' +
", config=" + config +
'}';
}
}
Subproject commit ea906c1ace2b5eaf19b1c36ead0fd6e1489feaeb
Subproject commit 101dc50429c98147b1109cb15c8a6c623e751759
......@@ -17,6 +17,7 @@ We have following receivers, and `default` implementors are provided in our Apac
1. **receiver-meter**. See [details](backend-meter.md).
1. **receiver-browser**. gRPC services to accept browser performance data and error log.
1. **receiver-log**. gRPC services accept log data.
1. **configuration-discovery**. gRPC services handle configurationDiscovery.
The sample settings of these receivers should be already in default `application.yml`, and also list here
```yaml
......@@ -66,6 +67,10 @@ receiver-browser:
receiver-log:
selector: ${SW_RECEIVER_LOG:default}
default:
configuration-discovery:
selector: ${SW_CONFIGURATION_DISCOVERY:default}
default:
```
## gRPC/HTTP server for receiver
......@@ -153,7 +158,7 @@ receiver_jaeger:
default:
gRPCHost: ${SW_RECEIVER_JAEGER_HOST:0.0.0.0}
gRPCPort: ${SW_RECEIVER_JAEGER_PORT:14250}
```
```
NOTICE, Jaeger receiver is only provided in `apache-skywalking-apm-x.y.z.tar.gz` tar.
......
......@@ -260,6 +260,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| exporter | grpc | targetHost | The host of target grpc server for receiving export data. | SW_EXPORTER_GRPC_HOST | 127.0.0.1 |
| - | - | targetPort | The port of target grpc server for receiving export data. | SW_EXPORTER_GRPC_PORT | 9870 |
| health-checker | default | checkIntervalSeconds | The period of check OAP internal health status. Unit is second. | SW_HEALTH_CHECKER_INTERVAL_SECONDS | 5 |
| configuration-discovery | default | disableMessageDigest | If true, agent receives the latest configuration every time even without change. In default, OAP uses SHA512 message digest mechanism to detect changes of configuration. | SW_DISABLE_MESSAGE_DIGEST | false
## Notice
¹ System Environment Variable name could be declared and changed in the application.yml. The names listed here,
......
......@@ -13,6 +13,7 @@ Right now, SkyWalking supports following dynamic configurations.
|core.default.endpoint-name-grouping| The endpoint name grouping setting, will override `endpoint-name-grouping.yml`. | same as [`endpoint-name-grouping.yml`](endpoint-grouping-rules.md) |
|agent-analyzer.default.sampleRate| Trace sampling , override `receiver-trace/default/sampleRate` of `application.yml`. | 10000 |
|agent-analyzer.default.slowTraceSegmentThreshold| Setting this threshold about the latency would make the slow trace segments sampled if they cost more time, even the sampling mechanism activated. The default value is `-1`, which means would not sample slow traces. Unit, millisecond. override `receiver-trace/default/slowTraceSegmentThreshold` of `application.yml`. | -1 |
|configuration-discovery.default.agentConfigurations| The ConfigurationDiscovery settings | look at [`configuration-discovery.md`](../service-agent/java-agent/configuration-discovery.md) |
This feature depends on upstream service, so it is **DISABLED** by default.
......
......@@ -164,6 +164,9 @@ property key | Description | Default |
`plugin.toolkit.log.grpc.reporter.max_message_size` | Specify the maximum size of log data for grpc client to report to. | `10485760` |
`plugin.toolkit.log.grpc.reporter.upstream_timeout` | How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
## Dynamic Configurations
All configurations above are static, if you need to change some agent settings at runtime, please read [CDS - Configuration Discovery Service document](configuration-discovery.md) for more details.
## Optional Plugins
Java agent plugins are all pluggable. Optional plugins could be provided in `optional-plugins` folder under agent or 3rd party repositories.
For using these plugins, you need to put the target plugin jar file into `/plugins`.
......
# CDS - Configuration Discovery Service
CDS - Configuration Discovery Service provides the dynamic configuration for the agent, defined in [gRPC](https://github.com/apache/skywalking-data-collect-protocol/blob/master/language-agent/ConfigurationDiscoveryService.proto).
## Configuration Format
The configuration content includes the service name and their configs. The
```yml
configurations:
//service name
serviceA:
// Configurations of service A
// Key and Value are determined by the agent side.
// Check the agent setup doc for all available configurations.
key1: value1
key2: value2
...
serviceB:
...
```
## Available key(s) and value(s) in Java Agent.
Java agent supports the following dynamic configurations.
| Config Key | Value Description | Value Format Example | Required Plugin(s) |
| :-----------------------: | :----------------------------------------------------------: | :-------------------: | :----------------: |
| agent.sample_n_per_3_secs | The number of sampled traces per 3 seconds | -1 | - |
* `Required plugin(s)`, the configuration affects only when the required plugins activated.
......@@ -131,6 +131,11 @@
<artifactId>skywalking-log-recevier-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-discovery-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- fetcher module -->
......
......@@ -413,3 +413,8 @@ health-checker:
selector: ${SW_HEALTH_CHECKER:-}
default:
checkIntervalSeconds: ${SW_HEALTH_CHECKER_INTERVAL_SECONDS:5}
configuration-discovery:
selector: ${SW_CONFIGURATION_DISCOVERY:-}
default:
disableMessageDigest: ${SW_DISABLE_MESSAGE_DIGEST:false}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>server-receiver-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>8.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>configuration-discovery-receiver-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-sharing-server-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* Dynamic configuration items, save the dynamic configuration of the agent corresponding to the service.
*/
@Setter
@Getter
@ToString
public class AgentConfigurations {
private String service;
private Map<String, String> configuration;
/**
* The uuid is calculated by the dynamic configuration of the service.
*/
private volatile String uuid;
public AgentConfigurations(final String service, final Map<String, String> configuration, final String uuid) {
this.service = service;
this.configuration = configuration;
this.uuid = uuid;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.io.InputStream;
import java.io.Reader;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
/**
* Used to parse the String configuration to AgentConfigurations.
*/
@Slf4j
public class AgentConfigurationsReader {
private Map yamlData;
public AgentConfigurationsReader(InputStream inputStream) {
Yaml yaml = new Yaml(new SafeConstructor());
yamlData = (Map) yaml.load(inputStream);
}
public AgentConfigurationsReader(Reader io) {
Yaml yaml = new Yaml(new SafeConstructor());
yamlData = (Map) yaml.load(io);
}
public AgentConfigurationsTable readAgentConfigurationsTable() {
AgentConfigurationsTable agentConfigurationsTable = new AgentConfigurationsTable();
try {
if (Objects.nonNull(yamlData)) {
Map configurationsData = (Map) yamlData.get("configurations");
if (configurationsData != null) {
configurationsData.forEach((k, v) -> {
Map map = (Map) v;
StringBuilder serviceConfigStr = new StringBuilder();
Map<String, String> config = new HashMap<>(map.size());
map.forEach((key, value) -> {
config.put(key.toString(), value.toString());
serviceConfigStr.append(key.toString()).append(":").append(value.toString());
});
AgentConfigurations agentConfigurations = new AgentConfigurations(
k.toString(), config, DigestUtils.sha512Hex(serviceConfigStr.toString()));
agentConfigurationsTable.getAgentConfigurationsCache()
.put(agentConfigurations.getService(), agentConfigurations);
});
}
}
} catch (Exception e) {
log.error("Read ConfigurationDiscovery configurations error.", e);
}
return agentConfigurationsTable;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.util.HashMap;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* Dynamic configuration items, save the dynamic configuration of the agent corresponding to the service.
*/
@Setter
@Getter
@ToString
public class AgentConfigurationsTable {
private Map<String, AgentConfigurations> agentConfigurationsCache;
public AgentConfigurationsTable() {
this.agentConfigurationsCache = new HashMap<>();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.io.StringReader;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
/**
* AgentConfigurationsWatcher used to handle dynamic configuration changes.
*/
public class AgentConfigurationsWatcher extends ConfigChangeWatcher {
private volatile String settingsString;
private volatile AgentConfigurationsTable agentConfigurationsTable;
public AgentConfigurationsWatcher(ModuleProvider provider) {
super(ConfigurationDiscoveryModule.NAME, provider, "agentConfigurations");
this.settingsString = Const.EMPTY_STRING;
this.agentConfigurationsTable = new AgentConfigurationsTable();
}
@Override
public void notify(ConfigChangeEvent value) {
if (value.getEventType().equals(EventType.DELETE)) {
settingsString = Const.EMPTY_STRING;
this.agentConfigurationsTable = new AgentConfigurationsTable();
} else {
settingsString = value.getNewValue();
AgentConfigurationsReader agentConfigurationsReader =
new AgentConfigurationsReader(new StringReader(value.getNewValue()));
this.agentConfigurationsTable = agentConfigurationsReader.readAgentConfigurationsTable();
}
}
@Override
public String value() {
return settingsString;
}
public AgentConfigurations getAgentConfigurations(String service) {
return agentConfigurationsTable.getAgentConfigurationsCache().get(service);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
public class ConfigurationDiscoveryModule extends ModuleDefine {
public static final String NAME = "configuration-discovery";
public ConfigurationDiscoveryModule() {
super(NAME);
}
@Override
public Class[] services() {
return new Class[0];
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class ConfigurationDiscoveryModuleConfig extends ModuleConfig {
/**
* If true, agent receives the latest configuration every time even without change.
* In default, OAP uses SHA512 message digest mechanism to detect changes of configuration.
*/
@Setter
@Getter
private boolean disableMessageDigest = false;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.recevier.configuration.discovery.handler.grpc.ConfigurationDiscoveryServiceHandler;
public class ConfigurationDiscoveryProvider extends ModuleProvider {
private AgentConfigurationsWatcher agentConfigurationsWatcher;
private ConfigurationDiscoveryModuleConfig configurationDiscoveryModuleConfig;
@Override
public String name() {
return "default";
}
@Override
public Class<? extends ModuleDefine> module() {
return ConfigurationDiscoveryModule.class;
}
public ConfigurationDiscoveryProvider() {
configurationDiscoveryModuleConfig = new ConfigurationDiscoveryModuleConfig();
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return configurationDiscoveryModuleConfig;
}
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
agentConfigurationsWatcher = new AgentConfigurationsWatcher(this);
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME)
.provider()
.getService(
DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(agentConfigurationsWatcher);
/*
* Register ConfigurationDiscoveryServiceHandler to process gRPC requests for ConfigurationDiscovery.
*/
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
grpcHandlerRegister.addHandler(new ConfigurationDiscoveryServiceHandler(
agentConfigurationsWatcher,
configurationDiscoveryModuleConfig.isDisableMessageDigest()
));
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
}
@Override
public String[] requiredModules() {
return new String[] {
ConfigurationModule.NAME,
SharingServerModule.NAME
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery.handler.grpc;
import com.google.common.collect.Lists;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.agent.dynamic.configuration.v3.ConfigurationDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.agent.dynamic.configuration.v3.ConfigurationSyncRequest;
import org.apache.skywalking.apm.network.common.v3.Commands;
import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.trace.component.command.ConfigurationDiscoveryCommand;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.recevier.configuration.discovery.AgentConfigurations;
import org.apache.skywalking.oap.server.recevier.configuration.discovery.AgentConfigurationsWatcher;
/**
* Provide query agent dynamic configuration, through the gRPC protocol,
*/
@Slf4j
public class ConfigurationDiscoveryServiceHandler extends ConfigurationDiscoveryServiceGrpc.ConfigurationDiscoveryServiceImplBase implements GRPCHandler {
private final AgentConfigurationsWatcher agentConfigurationsWatcher;
/**
* If the current configuration is true, the requestId and uuid will not be judged, and the dynamic configuration of
* the service corresponding to the agent will be returned directly
*/
private boolean disableMessageDigest = false;
public ConfigurationDiscoveryServiceHandler(AgentConfigurationsWatcher agentConfigurationsWatcher,
boolean disableMessageDigest) {
this.agentConfigurationsWatcher = agentConfigurationsWatcher;
this.disableMessageDigest = disableMessageDigest;
}
/*
* Process the request for querying the dynamic configuration of the agent.
* If there is agent dynamic configuration information corresponding to the service,
* the ConfigurationDiscoveryCommand is returned to represent the dynamic configuration information.
*/
@Override
public void fetchConfigurations(final ConfigurationSyncRequest request,
final StreamObserver<Commands> responseObserver) {
Commands.Builder commandsBuilder = Commands.newBuilder();
AgentConfigurations agentConfigurations = agentConfigurationsWatcher.getAgentConfigurations(
request.getService());
if (null != agentConfigurations) {
if (disableMessageDigest || !Objects.equals(agentConfigurations.getUuid(), request.getUuid())) {
ConfigurationDiscoveryCommand configurationDiscoveryCommand =
newAgentDynamicConfigCommand(agentConfigurations);
commandsBuilder.addCommands(configurationDiscoveryCommand.serialize().build());
}
}
responseObserver.onNext(commandsBuilder.build());
responseObserver.onCompleted();
}
public ConfigurationDiscoveryCommand newAgentDynamicConfigCommand(AgentConfigurations agentConfigurations) {
List<KeyStringValuePair> configurationList = Lists.newArrayList();
agentConfigurations.getConfiguration().forEach((k, v) -> {
KeyStringValuePair.Builder builder = KeyStringValuePair.newBuilder().setKey(k).setValue(v);
configurationList.add(builder.build());
});
return new ConfigurationDiscoveryCommand(
UUID.randomUUID().toString(), agentConfigurations.getUuid(), configurationList);
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.recevier.configuration.discovery.ConfigurationDiscoveryModule
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.recevier.configuration.discovery.ConfigurationDiscoveryProvider
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
public class AgentConfigurationsReaderTest {
@Test
public void testReadAgentConfigurations() {
AgentConfigurationsReader reader = new AgentConfigurationsReader(
this.getClass().getClassLoader().getResourceAsStream("agent-dynamic-configuration.yml"));
Map<String, AgentConfigurations> configurationCache = reader.readAgentConfigurationsTable()
.getAgentConfigurationsCache();
Assert.assertEquals(2, configurationCache.size());
AgentConfigurations agentConfigurations0 = configurationCache.get("serviceA");
Assert.assertEquals("serviceA", agentConfigurations0.getService());
Assert.assertEquals(2, agentConfigurations0.getConfiguration().size());
Assert.assertEquals("1000", agentConfigurations0.getConfiguration().get("trace.sample_rate"));
Assert.assertEquals(
"/api/seller/seller/*", agentConfigurations0.getConfiguration().get("trace.ignore_path"));
Assert.assertEquals(
"92670f1ccbdee60e14ffc054d70a5cf3f93f6b5fb1adb83b10bea4fec79b96e7bc5e7b188e231428853721ded42ec756663947316065617f3cfdf51d6dfc8da6",
agentConfigurations0.getUuid()
);
AgentConfigurations agentConfigurations1 = configurationCache.get("serviceB");
Assert.assertEquals("serviceB", agentConfigurations1.getService());
Assert.assertEquals(2, agentConfigurations1.getConfiguration().size());
Assert.assertEquals("1000", agentConfigurations1.getConfiguration().get("trace.sample_rate"));
Assert.assertEquals(
"/api/seller/seller/*", agentConfigurations1.getConfiguration().get("trace.ignore_path"));
Assert.assertEquals(
"92670f1ccbdee60e14ffc054d70a5cf3f93f6b5fb1adb83b10bea4fec79b96e7bc5e7b188e231428853721ded42ec756663947316065617f3cfdf51d6dfc8da6",
agentConfigurations0.getUuid()
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.recevier.configuration.discovery;
import java.io.IOException;
import java.io.Reader;
import java.util.Map;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;
public class AgentConfigurationsWatcherTest {
@Spy
private AgentConfigurationsWatcher agentConfigurationsWatcher = new AgentConfigurationsWatcher(null);
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testConfigModifyEvent() throws IOException {
AgentConfigurationsTable agentConfigurationsTable = Whitebox.getInternalState(
agentConfigurationsWatcher, "agentConfigurationsTable");
assertTrue(agentConfigurationsTable.getAgentConfigurationsCache().isEmpty());
Reader reader = ResourceUtils.read("agent-dynamic-configuration.yml");
char[] chars = new char[1024 * 1024];
int length = reader.read(chars);
agentConfigurationsWatcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(
new String(chars, 0, length),
ConfigChangeWatcher.EventType.MODIFY
));
AgentConfigurationsTable modifyAgentConfigurationsTable = Whitebox.getInternalState(
agentConfigurationsWatcher, "agentConfigurationsTable");
Map<String, AgentConfigurations> configurationCache = modifyAgentConfigurationsTable.getAgentConfigurationsCache();
Assert.assertEquals(2, configurationCache.size());
AgentConfigurations agentConfigurations0 = configurationCache.get("serviceA");
Assert.assertEquals("serviceA", agentConfigurations0.getService());
Assert.assertEquals(2, agentConfigurations0.getConfiguration().size());
Assert.assertEquals("1000", agentConfigurations0.getConfiguration().get("trace.sample_rate"));
Assert.assertEquals(
"/api/seller/seller/*", agentConfigurations0.getConfiguration().get("trace.ignore_path"));
Assert.assertEquals(
"92670f1ccbdee60e14ffc054d70a5cf3f93f6b5fb1adb83b10bea4fec79b96e7bc5e7b188e231428853721ded42ec756663947316065617f3cfdf51d6dfc8da6",
agentConfigurations0.getUuid()
);
AgentConfigurations agentConfigurations1 = configurationCache.get("serviceB");
Assert.assertEquals("serviceB", agentConfigurations1.getService());
Assert.assertEquals(2, agentConfigurations1.getConfiguration().size());
Assert.assertEquals("1000", agentConfigurations1.getConfiguration().get("trace.sample_rate"));
Assert.assertEquals(
"/api/seller/seller/*", agentConfigurations1.getConfiguration().get("trace.ignore_path"));
Assert.assertEquals(
"92670f1ccbdee60e14ffc054d70a5cf3f93f6b5fb1adb83b10bea4fec79b96e7bc5e7b188e231428853721ded42ec756663947316065617f3cfdf51d6dfc8da6",
agentConfigurations0.getUuid()
);
}
@Test
public void testConfigDeleteEvent() throws IOException {
Reader reader = ResourceUtils.read("agent-dynamic-configuration.yml");
agentConfigurationsWatcher = spy(new AgentConfigurationsWatcher(null));
Whitebox.setInternalState(
agentConfigurationsWatcher, "agentConfigurationsTable",
new AgentConfigurationsReader(reader).readAgentConfigurationsTable()
);
agentConfigurationsWatcher.notify(
new ConfigChangeWatcher.ConfigChangeEvent("whatever", ConfigChangeWatcher.EventType.DELETE));
AgentConfigurationsTable agentConfigurationsTable = Whitebox.getInternalState(
agentConfigurationsWatcher, "agentConfigurationsTable");
Map<String, AgentConfigurations> configurationCache = agentConfigurationsTable.getAgentConfigurationsCache();
Assert.assertEquals(0, configurationCache.size());
AgentConfigurations agentConfigurations0 = configurationCache.get("serviceA");
AgentConfigurations agentConfigurations1 = configurationCache.get("serviceB");
Assert.assertNull(null, agentConfigurations0);
Assert.assertNull(null, agentConfigurations1);
}
}
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
configurations:
serviceA:
trace.sample_rate: 1000
trace.ignore_path: /api/seller/seller/*
serviceB:
trace.sample_rate: 1000
trace.ignore_path: /api/seller/seller/*
\ No newline at end of file
......@@ -43,6 +43,7 @@
<module>skywalking-meter-receiver-plugin</module>
<module>skywalking-browser-receiver-plugin</module>
<module>skywalking-log-recevier-plugin</module>
<module>configuration-discovery-receiver-plugin</module>
</modules>
<dependencies>
......
Subproject commit ea906c1ace2b5eaf19b1c36ead0fd6e1489feaeb
Subproject commit 101dc50429c98147b1109cb15c8a6c623e751759
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册