提交 58cf5936 编写于 作者: wu-sheng's avatar wu-sheng 提交者: Gao Hongtao

Improve OAP cluster performance (#2271)

* Refactor for cluster mode.

* Make send to L2 in certain cycle.
上级 dd574dd3
......@@ -12,4 +12,5 @@ packages/
/skywalking-agent/
/dist/
/docker/snapshot/*.gz
.mvn/wrapper/*.jar
\ No newline at end of file
.mvn/wrapper/*.jar
OALLexer.tokens
......@@ -33,13 +33,17 @@ public class DataCarrier<T> {
private String name;
public DataCarrier(int channelSize, int bufferSize) {
this("default", channelSize, bufferSize);
this("DEFAULT", channelSize, bufferSize);
}
public DataCarrier(String name, int channelSize, int bufferSize) {
this(name, name, channelSize, bufferSize);
}
public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
this.name = name;
this.bufferSize = bufferSize;
this.channelSize = channelSize;
this.bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
this.channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
channels = new Channels<T>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
/**
* Read value from system env.
*
* @author wusheng
*/
public class EnvUtil {
public static int getInt(String envName, int defaultValue) {
int value = defaultValue;
String envValue = System.getenv(envName);
if (envValue != null) {
try {
value = Integer.parseInt(envValue);
} catch (NumberFormatException e) {
}
}
return value;
}
public static long getLong(String envName, long defaultValue) {
long value = defaultValue;
String envValue = System.getenv(envName);
if (envValue != null) {
try {
value = Integer.parseInt(envValue);
} catch (NumberFormatException e) {
}
}
return value;
}
}
......@@ -20,6 +20,7 @@ package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.*;
import java.util.concurrent.Callable;
import org.apache.skywalking.apm.commons.datacarrier.EnvUtil;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
......@@ -35,15 +36,8 @@ public class BulkConsumePool implements ConsumerPool {
private volatile boolean isStarted = false;
public BulkConsumePool(String name, int size, long consumeCycle) {
size = EnvUtil.getInt(name + "_THREAD", size);
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
String threadNum = System.getenv(name + "_THREAD");
if (threadNum != null) {
try {
size = Integer.parseInt(threadNum);
} catch (NumberFormatException e) {
}
}
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
......
......@@ -18,20 +18,17 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.*;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -43,9 +40,10 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private AbstractWorker<Indicator> nextWorker;
private final DataCarrier<Indicator> dataCarrier;
private final MergeDataCache<Indicator> mergeDataCache;
private int messageNum;
private final String modelName;
private CounterMetric aggregationCounter;
private final long l2AggregationSendCycle;
private long lastSendTimestamp;
IndicatorAggregateWorker(ModuleManager moduleManager, int workerId, AbstractWorker<Indicator> nextWorker,
String modelName) {
......@@ -53,9 +51,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
this.modelName = modelName;
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, 1, 10000);
String name = "INDICATOR_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("IndicatorAggregateWorker." + modelName, name, 2, 10000);
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, BulkConsumePool.Creator.recommendMaxSize() * 2, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
......@@ -67,6 +65,9 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
MetricCreator metricCreator = moduleManager.find(TelemetryModule.NAME).provider().getService(MetricCreator.class);
aggregationCounter = metricCreator.createCounter("indicator_aggregation", "The number of rows in aggregation",
new MetricTag.Keys("metricName", "level", "dimensionality"), new MetricTag.Values(modelName, "1", "min"));
lastSendTimestamp = System.currentTimeMillis();
l2AggregationSendCycle = EnvUtil.getLong("INDICATOR_L1_AGGREGATION_SEND_CYCLE", 1000);
}
@Override public final void in(Indicator indicator) {
......@@ -76,13 +77,23 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
private void onWork(Indicator indicator) {
aggregationCounter.inc();
messageNum++;
aggregate(indicator);
if (messageNum >= 1000 || indicator.getEndOfBatchContext().isEndOfBatch()) {
sendToNext();
messageNum = 0;
if (indicator.getEndOfBatchContext().isEndOfBatch()) {
if (shouldSend()) {
sendToNext();
}
}
}
private boolean shouldSend() {
long now = System.currentTimeMillis();
// Continue L2 aggregation in certain cycle.
if (now - lastSendTimestamp > l2AggregationSendCycle) {
lastSendTimestamp = now;
return true;
}
return false;
}
private void sendToNext() {
......
......@@ -69,7 +69,7 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 2000);
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, name, 1, 2000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new PersistentConsumer(this));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册