提交 8306ad45 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Reduce the number of threads to improve performance. (#3133)

* Refactor Persistence worker.

* 1. Provide InsertRequest and UpdateRequest interface for prepare persistence.
2. Implement the ids query for H2 metrics DAO.

* Refactor worker framework

* Use queue to receive asynchronous batch request.

* Rename the Datacarrier thread name.

* Fixed some mistake.

* New mistake.
上级 ab9bc922
......@@ -28,6 +28,7 @@ import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
......@@ -45,10 +46,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
private final AbstractWorker<ExportEvent> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, AbstractWorker<Metrics> nextAlarmWorker,
AbstractWorker<ExportEvent> nextExportWorker) {
super(moduleDefineHolder, batchSize);
super(moduleDefineHolder);
this.model = model;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
......@@ -76,7 +76,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override public void in(Metrics metrics) {
metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
......@@ -84,23 +83,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
return mergeDataCache;
}
public boolean flushAndSwitch() {
boolean isSwitch;
try {
if (isSwitch = getCache().trySwitchPointer()) {
getCache().switchPointer();
}
} finally {
getCache().trySwitchPointerFinally();
}
return isSwitch;
}
@Override public List<Object> prepareBatch(MergeDataCache<Metrics> cache) {
@Override public void prepareBatch(MergeDataCache<Metrics> cache, List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
List<Object> batchCollection = new LinkedList<>();
Collection<Metrics> collection = cache.getLast().collection();
int i = 0;
......@@ -131,9 +116,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
if (dbMetricsMap.containsKey(metric.id())) {
metric.combine(dbMetricsMap.get(metric.id()));
metric.calculate();
batchCollection.add(metricsDAO.prepareBatchUpdate(model, metric));
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, metric));
} else {
batchCollection.add(metricsDAO.prepareBatchInsert(model, metric));
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metric));
}
if (Objects.nonNull(nextAlarmWorker)) {
......@@ -152,11 +137,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
i++;
}
if (batchCollection.size() > 0) {
logger.debug("prepareBatch model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
if (prepareRequests.size() > 0) {
logger.debug("prepare batch requests for model {}, took time: {}", model.getName(), System.currentTimeMillis() - start);
}
return batchCollection;
}
@Override public void cacheData(Metrics input) {
......@@ -186,17 +169,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override public void consume(List<Metrics> data) {
Iterator<Metrics> inputIterator = data.iterator();
int i = 0;
while (inputIterator.hasNext()) {
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
metrics.asEndOfBatch();
}
persistent.onWork(metrics);
}
data.forEach(persistent::onWork);
}
@Override public void onError(List<Metrics> data, Throwable t) {
......
......@@ -18,25 +18,15 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.DisableRegister;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Storage;
import org.apache.skywalking.oap.server.core.storage.model.IModelSetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceSetter;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -61,6 +51,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
}
}
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Metrics> metricsClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
......@@ -114,16 +105,14 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(moduleDefineHolder);
ExportWorker exportWorker = new ExportWorker(moduleDefineHolder);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model,
1000, metricsDAO, alarmNotifyWorker, exportWorker);
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker);
persistentWorkers.add(minutePersistentWorker);
return minutePersistentWorker;
}
private MetricsPersistentWorker worker(ModuleDefineHolder moduleDefineHolder, IMetricsDAO metricsDAO, Model model) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model,
1000, metricsDAO, null, null);
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(moduleDefineHolder, model, metricsDAO, null, null);
persistentWorkers.add(persistentWorker);
return persistentWorker;
......
......@@ -18,10 +18,11 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.data.Window;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
......@@ -32,28 +33,11 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
private final int batchSize;
private final IBatchDAO batchDAO;
PersistenceWorker(ModuleDefineHolder moduleDefineHolder, int batchSize) {
PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
super(moduleDefineHolder);
this.batchSize = batchSize;
this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}
void onWork(INPUT input) {
if (getCache().currentCollectionSize() >= batchSize) {
try {
if (getCache().trySwitchPointer()) {
getCache().switchPointer();
List<?> collection = buildBatchCollection();
batchDAO.asynchronous(collection);
}
} finally {
getCache().trySwitchPointerFinally();
}
}
cacheData(input);
}
......@@ -73,10 +57,9 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
return isSwitch;
}
public abstract List<Object> prepareBatch(CACHE cache);
public abstract void prepareBatch(CACHE cache, List<PrepareRequest> prepareRequests);
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
try {
while (getCache().getLast().isWriting()) {
try {
......@@ -87,11 +70,10 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
}
if (getCache().getLast().collection() != null) {
batchCollection = prepareBatch(getCache());
prepareBatch(getCache(), prepareRequests);
}
} finally {
getCache().finishReadingLast();
}
return batchCollection;
}
}
......@@ -18,97 +18,39 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDataCache<Record>> {
public class RecordPersistentWorker extends AbstractWorker<Record> {
private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
private final Model model;
private final NonMergeDataCache<Record> nonMergeDataCache;
private final IRecordDAO recordDAO;
private final DataCarrier<Record> dataCarrier;
private final IBatchDAO batchDAO;
RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, int batchSize,
IRecordDAO recordDAO) {
super(moduleDefineHolder, batchSize);
RecordPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IRecordDAO recordDAO) {
super(moduleDefineHolder);
this.model = model;
this.nonMergeDataCache = new NonMergeDataCache<>();
this.recordDAO = recordDAO;
String name = "RECORD_PERSISTENT";
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new RecordPersistentWorker.PersistentConsumer(this));
this.batchDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(IBatchDAO.class);
}
@Override public void in(Record record) {
dataCarrier.produce(record);
}
@Override public NonMergeDataCache<Record> getCache() {
return nonMergeDataCache;
}
@Override public List<Object> prepareBatch(NonMergeDataCache<Record> cache) {
List<Object> batchCollection = new LinkedList<>();
cache.getLast().collection().forEach(record -> {
try {
batchCollection.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
return batchCollection;
}
@Override public void cacheData(Record input) {
nonMergeDataCache.writing();
nonMergeDataCache.add(input);
nonMergeDataCache.finishWriting();
}
private class PersistentConsumer implements IConsumer<Record> {
private final RecordPersistentWorker persistent;
private PersistentConsumer(RecordPersistentWorker persistent) {
this.persistent = persistent;
}
@Override public void init() {
}
@Override public void consume(List<Record> data) {
for (Record record : data) {
persistent.onWork(record);
}
}
@Override public void onError(List<Record> data, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
try {
InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);
batchDAO.asynchronous(insertRequest);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
......@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.analysis.*;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
......@@ -48,8 +47,6 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
}
}
@Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) {
if (DisableRegister.INSTANCE.include(stream.name())) {
......@@ -66,9 +63,8 @@ public class RecordStreamProcessor implements StreamProcessor<Record> {
IModelSetter modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IModelSetter.class);
Model model = modelSetter.putIfAbsent(recordClass, stream.scopeId(), new Storage(stream.name(), true, true, Downsampling.Second), true);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, 4000, recordDAO);
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
persistentWorkers.add(persistentWorker);
workers.put(recordClass, persistentWorker);
}
}
......@@ -18,20 +18,21 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
* Top N worker is a persistence worker, but no
*
* @author wusheng
* @author wusheng, peng-yongsheng
*/
public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
......@@ -44,9 +45,9 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
private long reportCycle;
private volatile long lastReportTimestamp;
public TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model,
int topNSize, IRecordDAO recordDAO) {
super(moduleDefineHolder, -1);
super(moduleDefineHolder);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
this.recordDAO = recordDAO;
this.model = model;
......@@ -57,7 +58,7 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
this.reportCycle = 10 * 60 * 1000L;
}
@Override void onWork(TopN data) {
@Override public void cacheData(TopN data) {
limitedSizeDataCache.writing();
try {
limitedSizeDataCache.add(data);
......@@ -66,15 +67,6 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
}
}
/**
* TopN is not following the batch size trigger mode. The memory cost of this worker is limited always.
*
* `onWork` method has been override, so this method would never be executed. No need to implement this method,
*/
@Override public void cacheData(TopN data) {
}
@Override public LimitedSizeDataCache<TopN> getCache() {
return limitedSizeDataCache;
}
......@@ -84,8 +76,6 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
* time windows.
*
* Switch and persistent attempt happens based on reportCycle.
*
* @return
*/
@Override public boolean flushAndSwitch() {
long now = System.currentTimeMillis();
......@@ -96,16 +86,14 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
return super.flushAndSwitch();
}
@Override public List<Object> prepareBatch(LimitedSizeDataCache<TopN> cache) {
List<Object> batchCollection = new LinkedList<>();
@Override public void prepareBatch(LimitedSizeDataCache<TopN> cache, List<PrepareRequest> prepareRequests) {
cache.getLast().collection().forEach(record -> {
try {
batchCollection.add(recordDAO.prepareBatchInsert(model, record));
prepareRequests.add(recordDAO.prepareBatchInsert(model, record));
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
});
return batchCollection;
}
@Override public void in(TopN n) {
......@@ -113,16 +101,17 @@ public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<Top
}
private class TopNConsumer implements IConsumer<TopN> {
@Override public void init() {
}
@Override public void consume(List<TopN> data) {
/**
/*
* TopN is not following the batch size trigger mode.
* No need to implement this method, the memory size is limited always.
*/
data.forEach(row -> onWork(row));
data.forEach(TopNWorker.this::onWork);
}
@Override public void onError(List<TopN> data, Throwable t) {
......
......@@ -19,13 +19,14 @@
package org.apache.skywalking.oap.server.core.storage;
import java.util.List;
import org.apache.skywalking.oap.server.library.client.request.*;
/**
* @author peng-yongsheng
*/
public interface IBatchDAO extends DAO {
void asynchronous(List<?> collection);
void asynchronous(InsertRequest insertRequest);
void synchronous(List<?> collection);
void synchronous(List<PrepareRequest> prepareRequests);
}
......@@ -21,11 +21,12 @@ package org.apache.skywalking.oap.server.core.storage;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
/**
* @author peng-yongsheng
*/
public interface IRecordDAO<INSERT> extends DAO {
public interface IRecordDAO extends DAO {
INSERT prepareBatchInsert(Model model, Record record) throws IOException;
InsertRequest prepareBatchInsert(Model model, Record record) throws IOException;
}
......@@ -23,6 +23,7 @@ import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
......@@ -61,79 +62,40 @@ public enum PersistenceTimer {
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSaveRecord(batchDAO),
t -> logger.error("Extract data and save record failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(() -> extractDataAndSaveMetrics(batchDAO),
t -> logger.error("Extract data and save metrics failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO),
t -> logger.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
this.isStarted = true;
}
}
private void extractDataAndSaveRecord(IBatchDAO batchDAO) {
private void extractDataAndSave(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
logger.debug("Extract data and save record");
logger.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List records = new LinkedList();
List<PrepareRequest> prepareRequests = new LinkedList<>();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(RecordStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
buildBatchCollection(persistenceWorkers, records);
if (debug) {
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
}
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(records)) {
batchDAO.asynchronous(records);
}
} finally {
executeLatencyTimer.finish();
}
} catch (Throwable e) {
errorCounter.inc();
logger.error(e.getMessage(), e);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Persistence data save finish");
}
}
if (debug) {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
private void extractDataAndSaveMetrics(IBatchDAO batchDAO) {
if (logger.isDebugEnabled()) {
logger.debug("Extract data and save metrics");
}
long startTime = System.currentTimeMillis();
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List metrics = new LinkedList();
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>(MetricsStreamProcessor.getInstance().getPersistentWorkers());
buildBatchCollection(persistenceWorkers, metrics);
if (worker.flushAndSwitch()) {
worker.buildBatchRequests(prepareRequests);
}
});
if (debug) {
logger.info("build metrics batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
logger.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
......@@ -141,8 +103,8 @@ public enum PersistenceTimer {
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(metrics)) {
batchDAO.synchronous(metrics);
if (CollectionUtils.isNotEmpty(prepareRequests)) {
batchDAO.synchronous(prepareRequests);
}
} finally {
executeLatencyTimer.finish();
......@@ -160,23 +122,4 @@ public enum PersistenceTimer {
logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
@SuppressWarnings("unchecked")
private void buildBatchCollection(List<PersistenceWorker> persistenceWorkers, List collection) {
persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
if (worker.flushAndSwitch()) {
List<?> batchCollection = worker.buildBatchCollection();
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
}
collection.addAll(batchCollection);
}
});
}
}
......@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT> {
@Getter private final ModuleDefineHolder moduleDefineHolder;
public AbstractWorker(ModuleDefineHolder moduleDefineHolder) {
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
......@@ -47,38 +48,23 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
this.concurrentRequests = concurrentRequests;
}
@Override public void asynchronous(List<?> collection) {
@Override public void asynchronous(InsertRequest insertRequest) {
if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, flushInterval, concurrentRequests);
}
if (logger.isDebugEnabled()) {
logger.debug("Asynchronous batch persistent data collection size: {}", collection.size());
}
if (CollectionUtils.isNotEmpty(collection)) {
collection.forEach(builder -> {
if (builder instanceof IndexRequest) {
this.bulkProcessor.add((IndexRequest)builder);
}
if (builder instanceof UpdateRequest) {
this.bulkProcessor.add((UpdateRequest)builder);
}
});
this.bulkProcessor.flush();
}
this.bulkProcessor.add((IndexRequest)insertRequest);
}
@Override public void synchronous(List<?> collection) {
if (CollectionUtils.isNotEmpty(collection)) {
@Override public void synchronous(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
BulkRequest request = new BulkRequest();
for (Object builder : collection) {
if (builder instanceof IndexRequest) {
request.add((IndexRequest)builder);
}
if (builder instanceof UpdateRequest) {
request.add((UpdateRequest)builder);
for (PrepareRequest prepareRequest : prepareRequests) {
if (prepareRequest instanceof InsertRequest) {
request.add((IndexRequest)prepareRequest);
} else {
request.add((UpdateRequest)prepareRequest);
}
}
getClient().synchronousBulk(request);
......
......@@ -23,13 +23,13 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.index.IndexRequest;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author peng-yongsheng
*/
public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
public class RecordEsDAO extends EsDAO implements IRecordDAO {
private final StorageBuilder<Record> storageBuilder;
......@@ -38,7 +38,7 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
this.storageBuilder = storageBuilder;
}
@Override public IndexRequest prepareBatchInsert(Model model, Record record) throws IOException {
@Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(record));
String modelName = TimeSeriesUtils.timeSeries(model, record.getTimeBucket());
return getClient().prepareInsert(modelName, record.id(), builder);
......
......@@ -18,16 +18,18 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.*;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.*;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author wusheng, peng-yongsheng
......@@ -37,24 +39,36 @@ public class H2BatchDAO implements IBatchDAO {
private static final Logger logger = LoggerFactory.getLogger(H2BatchDAO.class);
private JDBCHikariCPClient h2Client;
private final DataCarrier<PrepareRequest> dataCarrier;
public H2BatchDAO(JDBCHikariCPClient h2Client) {
this.h2Client = h2Client;
String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
}
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
}
@Override public void synchronous(List<?> collection) {
if (CollectionUtils.isEmpty(collection)) {
@Override public void synchronous(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("batch sql statements execute, data size: {}", collection.size());
logger.debug("batch sql statements execute, data size: {}", prepareRequests.size());
}
try (Connection connection = h2Client.getConnection()) {
for (Object exe : collection) {
for (PrepareRequest prepareRequest : prepareRequests) {
try {
SQLExecutor sqlExecutor = (SQLExecutor)exe;
SQLExecutor sqlExecutor = (SQLExecutor)prepareRequest;
sqlExecutor.invoke(connection);
} catch (SQLException e) {
// Just avoid one execution failure makes the rest of batch failure.
......@@ -66,7 +80,31 @@ public class H2BatchDAO implements IBatchDAO {
}
}
@Override public void asynchronous(List<?> collection) {
synchronous(collection);
@Override public void asynchronous(InsertRequest insertRequest) {
this.dataCarrier.produce(insertRequest);
}
private class H2BatchConsumer implements IConsumer<PrepareRequest> {
private final H2BatchDAO h2BatchDAO;
private H2BatchConsumer(H2BatchDAO h2BatchDAO) {
this.h2BatchDAO = h2BatchDAO;
}
@Override public void init() {
}
@Override public void consume(List<PrepareRequest> prepareRequests) {
h2BatchDAO.synchronous(prepareRequests);
}
@Override public void onError(List<PrepareRequest> prepareRequests, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
}
}
......@@ -23,12 +23,12 @@ import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
/**
* @author wusheng
*/
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO<SQLExecutor> {
public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO {
private JDBCHikariCPClient h2Client;
private StorageBuilder<Record> storageBuilder;
......@@ -38,7 +38,7 @@ public class H2RecordDAO extends H2SQLExecutor implements IRecordDAO<SQLExecutor
this.storageBuilder = storageBuilder;
}
@Override public SQLExecutor prepareBatchInsert(Model model, Record record) throws IOException {
@Override public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
return getInsertExecutor(model.getName(), record, storageBuilder);
}
}
......@@ -19,32 +19,23 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.ArrayParamBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.TableMetaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.*;
import org.slf4j.*;
/**
* @author wusheng, peng-yongsheng
*/
public class H2SQLExecutor {
private static final Logger logger = LoggerFactory.getLogger(H2SQLExecutor.class);
protected List<StorageData> getByIDs(JDBCHikariCPClient h2Client, String modelName, String[] ids,
......@@ -67,6 +58,7 @@ public class H2SQLExecutor {
}
}
while (storageData != null);
return storageDataList;
}
} catch (SQLException | JDBCClientException e) {
......@@ -96,8 +88,7 @@ public class H2SQLExecutor {
}
}
protected StorageData toStorageData(ResultSet rs, String modelName,
StorageBuilder storageBuilder) throws SQLException {
protected StorageData toStorageData(ResultSet rs, String modelName, StorageBuilder storageBuilder) throws SQLException {
if (rs.next()) {
Map data = new HashMap();
List<ModelColumn> columns = TableMetaInfo.get(modelName).getColumns();
......@@ -122,8 +113,7 @@ public class H2SQLExecutor {
return Const.NONE;
}
protected SQLExecutor getInsertExecutor(String modelName, StorageData metrics,
StorageBuilder storageBuilder) throws IOException {
protected SQLExecutor getInsertExecutor(String modelName, StorageData metrics, StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + modelName + " VALUES");
......@@ -150,8 +140,7 @@ public class H2SQLExecutor {
return new SQLExecutor(sqlBuilder.toString(), param);
}
protected SQLExecutor getUpdateExecutor(String modelName, StorageData metrics,
StorageBuilder storageBuilder) throws IOException {
protected SQLExecutor getUpdateExecutor(String modelName, StorageData metrics, StorageBuilder storageBuilder) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(metrics);
SQLBuilder sqlBuilder = new SQLBuilder("UPDATE " + modelName + " SET ");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册