提交 5d064ced 编写于 作者: J Jared Tan 提交者: wu-sheng

make topN worker report period configurable. (#3892)

上级 a2dce716
...@@ -38,6 +38,7 @@ public class CoreModuleConfig extends ModuleConfig { ...@@ -38,6 +38,7 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int maxConcurrentCallsPerConnection; @Setter private int maxConcurrentCallsPerConnection;
@Setter private int maxMessageSize; @Setter private int maxMessageSize;
@Setter private boolean enableDatabaseSession; @Setter private boolean enableDatabaseSession;
@Setter private int topNReportPeriod;
private final List<String> downsampling; private final List<String> downsampling;
/** /**
* The period of doing data persistence. * The period of doing data persistence.
......
...@@ -22,6 +22,7 @@ import java.io.IOException; ...@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule; import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor; import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*; import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.cluster.*;
...@@ -170,6 +171,7 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -170,6 +171,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager); this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
} }
@Override public void start() throws ModuleStartException { @Override public void start() throws ModuleStartException {
......
...@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker; ...@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*; import java.util.*;
import lombok.Getter; import lombok.Getter;
import lombok.Setter;
import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*; import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record; import org.apache.skywalking.oap.server.core.analysis.record.Record;
...@@ -41,6 +42,8 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { ...@@ -41,6 +42,8 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
@Getter private List<TopNWorker> persistentWorkers = new ArrayList<>(); @Getter private List<TopNWorker> persistentWorkers = new ArrayList<>();
private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>(); private Map<Class<? extends Record>, TopNWorker> workers = new HashMap<>();
@Setter @Getter private int topNWorkerReportCycle = 10;
@Setter @Getter private int topSize = 50;
public static TopNStreamProcessor getInstance() { public static TopNStreamProcessor getInstance() {
return PROCESSOR; return PROCESSOR;
...@@ -63,7 +66,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> { ...@@ -63,7 +66,7 @@ public class TopNStreamProcessor implements StreamProcessor<TopN> {
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class); IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true); Model model = modelSetter.putIfAbsent(topNClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, 50, recordDAO); TopNWorker persistentWorker = new TopNWorker(moduleDefineHolder, model, topSize, topNWorkerReportCycle * 60 * 1000L, recordDAO);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
workers.put(topNClass, persistentWorker); workers.put(topNClass, persistentWorker);
} }
......
...@@ -46,7 +46,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top ...@@ -46,7 +46,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private volatile long lastReportTimestamp; private volatile long lastReportTimestamp;
TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
int topNSize, IRecordDAO recordDAO) { int topNSize, long reportCycle, IRecordDAO recordDAO) {
super(moduleDefineHolder); super(moduleDefineHolder);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize); this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO; this.recordDAO = recordDAO;
...@@ -54,8 +54,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top ...@@ -54,8 +54,8 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000); this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1); this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis(); this.lastReportTimestamp = System.currentTimeMillis();
// Top N persistent only works per 10 minutes. // Top N persistent works per 10 minutes default.
this.reportCycle = 10 * 60 * 1000L; this.reportCycle = reportCycle;
} }
@Override public void cacheData(TopN data) { @Override public void cacheData(TopN data) {
......
...@@ -72,6 +72,7 @@ core: ...@@ -72,6 +72,7 @@ core:
# Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
# the metrics may not be accurate within that minute. # the metrics may not be accurate within that minute.
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute
storage: storage:
# elasticsearch: # elasticsearch:
# nameSpace: ${SW_NAMESPACE:""} # nameSpace: ${SW_NAMESPACE:""}
......
...@@ -71,6 +71,7 @@ core: ...@@ -71,6 +71,7 @@ core:
# Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # Cache metric data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
# the metrics may not be accurate within that minute. # the metrics may not be accurate within that minute.
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute
storage: storage:
elasticsearch: elasticsearch:
nameSpace: ${SW_NAMESPACE:""} nameSpace: ${SW_NAMESPACE:""}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册