未验证 提交 73ee3103 编写于 作者: K killGC 提交者: GitHub

support sampleRate as Dynamic Configuration (#4968) (#4987)

上级 79be376a
......@@ -11,7 +11,7 @@ Right now, SkyWalking supports following dynamic configurations.
|alarm.default.alarm-settings| The alarm settings, will override `alarm-settings.yml`. | same as [`alarm-settings.yml`](backend-alarm.md) |
|core.default.apdexThreshold| The apdex threshold settings, will override `service-apdex-threshold.yml`. | same as [`service-apdex-threshold.yml`](apdex-threshold.md) |
|core.default.endpoint-name-grouping| The endpoint name grouping setting, will override `endpoint-name-grouping.yml`. | same as [`endpoint-name-grouping.yml`](endpoint-grouping-rules.md) |
|receiver-trace.default.sampleRate| Trace sampling , override `receiver-trace/default/sampleRate` of `applciation.yml`. | 10000 |
This feature depends on upstream service, so it is **DISABLED** by default.
......
......@@ -18,11 +18,12 @@
package org.apache.skywalking.oap.server.receiver.trace.provider;
import lombok.Getter;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.oal.rt.CoreOALDefine;
import org.apache.skywalking.oap.server.core.oal.rt.OALEngineLoaderService;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
......@@ -44,11 +45,12 @@ import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
public class TraceModuleProvider extends ModuleProvider {
@Getter
private final TraceServiceModuleConfig moduleConfig;
private DBLatencyThresholdsAndWatcher thresholds;
private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
private SegmentParserServiceImpl segmentParserService;
private TraceSampleRateWatcher traceSampleRateWatcher;
public TraceModuleProvider() {
this.moduleConfig = new TraceServiceModuleConfig();
......@@ -75,8 +77,11 @@ public class TraceModuleProvider extends ModuleProvider {
uninstrumentedGatewaysConfig = new UninstrumentedGatewaysConfig(this);
traceSampleRateWatcher = new TraceSampleRateWatcher(this);
moduleConfig.setDbLatencyThresholdsAndWatcher(thresholds);
moduleConfig.setUninstrumentedGatewaysConfig(uninstrumentedGatewaysConfig);
moduleConfig.setTraceSampleRateWatcher(traceSampleRateWatcher);
segmentParserService = new SegmentParserServiceImpl(getManager(), moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, segmentParserService);
......@@ -102,6 +107,7 @@ public class TraceModuleProvider extends ModuleProvider {
.getService(JettyHandlerRegister.class);
dynamicConfigurationService.registerConfigChangeWatcher(thresholds);
dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig);
dynamicConfigurationService.registerConfigChangeWatcher(traceSampleRateWatcher);
segmentParserService.setListenerManager(listenerManager());
grpcHandlerRegister.addHandler(
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class TraceSampleRateWatcher extends ConfigChangeWatcher {
private AtomicReference<Integer> sampleRate;
public TraceSampleRateWatcher(TraceModuleProvider provider) {
super(TraceModule.NAME, provider, "sampleRate");
sampleRate = new AtomicReference<>();
sampleRate.set(getDefaultValue());
}
private void activeSetting(String config) {
if (log.isDebugEnabled()) {
log.debug("Updating using new static config: {}", config);
}
try {
sampleRate.set(Integer.parseInt(config));
} catch (NumberFormatException ex) {
log.error("Cannot load sampleRate from: {}", config, ex);
}
}
@Override
public void notify(ConfigChangeEvent value) {
if (EventType.DELETE.equals(value.getEventType())) {
activeSetting(String.valueOf(getDefaultValue()));
} else {
activeSetting(value.getNewValue());
}
}
@Override
public String value() {
return String.valueOf(sampleRate.get());
}
private int getDefaultValue() {
return ((TraceModuleProvider) this.getProvider()).getModuleConfig().getSampleRate();
}
public int getSampleRate() {
return sampleRate.get();
}
}
......@@ -51,6 +51,9 @@ public class TraceServiceModuleConfig extends ModuleConfig {
@Setter
@Getter
private UninstrumentedGatewaysConfig uninstrumentedGatewaysConfig;
@Setter
@Getter
private TraceSampleRateWatcher traceSampleRateWatcher;
/**
* Analysis trace status.
* <p>
......
......@@ -176,7 +176,7 @@ public class SegmentAnalysisListener implements FirstAnalysisListener, EntryAnal
public Factory(ModuleManager moduleManager, TraceServiceModuleConfig config) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
this.sampler = new TraceSegmentSampler(config.getSampleRate());
this.sampler = new TraceSegmentSampler(config.getTraceSampleRateWatcher());
this.namingControl = moduleManager.find(CoreModule.NAME)
.provider()
.getService(NamingControl.class);
......
......@@ -18,18 +18,20 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
import org.apache.skywalking.oap.server.receiver.trace.provider.TraceSampleRateWatcher;
/**
* The sampler makes the sampling mechanism works at backend side. Sample result: [0,sampleRate) sampled, (sampleRate,~)
* ignored
*/
public class TraceSegmentSampler {
private int sampleRate = 10000;
private TraceSampleRateWatcher traceSampleRateWatcher;
public TraceSegmentSampler(int sampleRate) {
this.sampleRate = sampleRate;
public TraceSegmentSampler(TraceSampleRateWatcher traceSampleRateWatcher) {
this.traceSampleRateWatcher = traceSampleRateWatcher;
}
public boolean shouldSample(String segmentId) {
return segmentId.hashCode() % 10000 < sampleRate;
return segmentId.hashCode() % 10000 < traceSampleRateWatcher.getSampleRate();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.Optional;
import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class TraceSampleRateWatcherTest {
private TraceModuleProvider traceModuleProvider;
@Before
public void init() {
traceModuleProvider = new TraceModuleProvider();
}
@Test
public void testInit() {
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(traceModuleProvider);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
}
@Test(timeout = 20000)
public void testDynamicUpdate() throws InterruptedException {
ConfigWatcherRegister register = new MockConfigWatcherRegister(3);
TraceSampleRateWatcher watcher = new TraceSampleRateWatcher(traceModuleProvider);
register.registerConfigChangeWatcher(watcher);
register.start();
while (watcher.getSampleRate() == 10000) {
Thread.sleep(2000);
}
assertThat(watcher.getSampleRate(), is(9000));
assertThat(traceModuleProvider.getModuleConfig().getSampleRate(), is(10000));
}
@Test
public void testNotify() {
TraceSampleRateWatcher traceSampleRateWatcher = new TraceSampleRateWatcher(traceModuleProvider);
ConfigChangeWatcher.ConfigChangeEvent value1 = new ConfigChangeWatcher.ConfigChangeEvent("8000", ConfigChangeWatcher.EventType.MODIFY);
traceSampleRateWatcher.notify(value1);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 8000);
Assert.assertEquals(traceSampleRateWatcher.value(), "8000");
ConfigChangeWatcher.ConfigChangeEvent value2 = new ConfigChangeWatcher.ConfigChangeEvent("8000", ConfigChangeWatcher.EventType.DELETE);
traceSampleRateWatcher.notify(value2);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 10000);
Assert.assertEquals(traceSampleRateWatcher.value(), "10000");
ConfigChangeWatcher.ConfigChangeEvent value3 = new ConfigChangeWatcher.ConfigChangeEvent("500", ConfigChangeWatcher.EventType.ADD);
traceSampleRateWatcher.notify(value3);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
Assert.assertEquals(traceSampleRateWatcher.value(), "500");
ConfigChangeWatcher.ConfigChangeEvent value4 = new ConfigChangeWatcher.ConfigChangeEvent("abc", ConfigChangeWatcher.EventType.MODIFY);
traceSampleRateWatcher.notify(value4);
Assert.assertEquals(traceSampleRateWatcher.getSampleRate(), 500);
Assert.assertEquals(traceSampleRateWatcher.value(), "500");
}
public static class MockConfigWatcherRegister extends ConfigWatcherRegister {
public MockConfigWatcherRegister(long syncPeriod) {
super(syncPeriod);
}
@Override
public Optional<ConfigTable> readConfig(Set<String> keys) {
ConfigTable table = new ConfigTable();
table.add(new ConfigTable.ConfigItem("receiver-trace.default.sampleRate", "9000"));
return Optional.of(table);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册