未验证 提交 2fa821d9 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Enhance cache mechanism in the metric persistent process (#10021)

上级 9bdccc1d
......@@ -119,6 +119,13 @@
* Support dynamic config the sampling strategy in network profiling.
* Zipkin module support BanyanDB storage.
* Zipkin traces query API, sort the result set by start time by default.
* Enhance the cache mechanism in the metric persistent process.
* This cache only worked when the metric is accessible(readable) from the database. Once the insert execution is delayed
due to the scale, the cache loses efficacy. It only works for the last time update per minute, considering our
25s period.
* Fix ID conflicts for all JDBC storage implementations. Due to the insert delay, the JDBC storage implementation would
still generate another new insert statement.
* [**Breaking Change**] Remove `core/default/enableDatabaseSession` config.
* [**Breaking Change**] Add `@BanyanDB.TimestampColumn` to identify `which column in Record` is providing the timestamp(milliseconds) for BanyanDB,
since BanyanDB stream requires a timestamp in milliseconds.
For SQL-Database: add new column `timestamp` for tables `profile_task_log/top_n_database_statement`,
......
......@@ -26,7 +26,6 @@ The Configuration Vocabulary lists all available configurations provided by `app
| - | - | l1FlushPeriod | The period of L1 aggregation flush to L2 aggregation (in milliseconds). | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
| - | - | storageSessionTimeout | The threshold of session time (in milliseconds). Default value is 70000. | SW_CORE_STORAGE_SESSION_TIMEOUT | 70000 |
| - | - | persistentPeriod | The period of doing data persistence. Unit is second.Default value is 25s | SW_CORE_PERSISTENT_PERIOD | 25 |
| - | - | enableDatabaseSession | Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute. | SW_CORE_ENABLE_DATABASE_SESSION | true |
| - | - | topNReportPeriod | The execution period (in minutes) of top N sampler, which saves sampled data into the storage. | SW_CORE_TOPN_REPORT_PERIOD | 10 |
| - | - | activeExtraModelColumns | Appends entity names (e.g. service names) into metrics storage entities. | SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS | false |
| - | - | serviceNameMaxLength | Maximum length limit of service names. | SW_SERVICE_NAME_MAX_LENGTH | 70 |
......
......@@ -49,10 +49,6 @@ public class CoreModuleConfig extends ModuleConfig {
* The period of L1 aggregation flush. Unit is ms.
*/
private long l1FlushPeriod = 500;
/**
* Enable database flush session.
*/
private boolean enableDatabaseSession;
/**
* The threshold of session time. Unit is ms. Default value is 70s.
*/
......
......@@ -319,7 +319,6 @@ public class CoreModuleProvider extends ModuleProvider {
}
final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance();
metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
metricsStreamProcessor.setMetricsDataTTL(moduleConfig.getMetricsDataTTL());
......
......@@ -20,27 +20,30 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.apache.skywalking.oap.server.library.datacarrier.DataCarrier;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.oap.server.library.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
......@@ -58,13 +61,23 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private static long SESSION_TIMEOUT_OFFSITE_COUNTER = 0;
private final Model model;
private final Map<Metrics, Metrics> context;
/**
* The session cache holds the latest metrics in-memory.
* There are two ways to make sure metrics in-cache,
* 1. Metrics is read from the Database through {@link #loadFromStorage(List)}
* 2. The built {@link InsertRequest} executed successfully.
*
* There are two cases to remove metrics from the cache.
* 1. The metrics expired.
* 2. The built {@link UpdateRequest} executed failure, which could be caused
* (1) Database error. (2) No data updated, such as the counter of update statement is 0 in JDBC.
*/
private final Map<Metrics, Metrics> sessionCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
private final DataCarrier<Metrics> dataCarrier;
private final Optional<MetricsTransWorker> transWorker;
private final boolean enableDatabaseSession;
private final boolean supportUpdate;
private long sessionTimeout;
private CounterMetrics aggregationCounter;
......@@ -85,12 +98,15 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
MetricsTransWorker transWorker, boolean supportUpdate,
long storageSessionTimeout, int metricsDataTTL, MetricStreamKind kind) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
this.enableDatabaseSession = enableDatabaseSession;
// Due to the cache would be updated depending on final storage implementation,
// the map/cache could be updated concurrently.
// Set to ConcurrentHashMap in order to avoid HashMap deadlock.
// Since 9.3.0
this.sessionCache = new ConcurrentHashMap<>(100);
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
......@@ -140,14 +156,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder,
Model model,
IMetricsDAO metricsDAO,
boolean enableDatabaseSession,
boolean supportUpdate,
long storageSessionTimeout,
int metricsDataTTL,
MetricStreamKind kind) {
this(moduleDefineHolder, model, metricsDAO,
null, null, null,
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
// For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid context clear overlap,
......@@ -192,12 +207,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
metricsList.add(data);
if (metricsList.size() == batchSize) {
flushDataToStorage(metricsList, prepareRequests);
prepareFlushDataToStorage(metricsList, prepareRequests);
}
}
if (metricsList.size() > 0) {
flushDataToStorage(metricsList, prepareRequests);
prepareFlushDataToStorage(metricsList, prepareRequests);
}
if (prepareRequests.size() > 0) {
......@@ -209,14 +224,20 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
return prepareRequests;
}
private void flushDataToStorage(List<Metrics> metricsList,
List<PrepareRequest> prepareRequests) {
/**
* Build given prepareRequests to prepare database flush
*
* @param metricsList the metrics in the last read from the in-memory aggregated cache.
* @param prepareRequests the results for final execution.
*/
private void prepareFlushDataToStorage(List<Metrics> metricsList,
List<PrepareRequest> prepareRequests) {
try {
loadFromStorage(metricsList);
long timestamp = System.currentTimeMillis();
for (Metrics metrics : metricsList) {
Metrics cachedMetrics = context.get(metrics);
Metrics cachedMetrics = sessionCache.get(metrics);
if (cachedMetrics != null) {
/*
* If the metrics is not supportUpdate, defined through MetricsExtension#supportUpdate,
......@@ -233,12 +254,22 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
continue;
}
cachedMetrics.calculate();
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
prepareRequests.add(
metricsDAO.prepareBatchUpdate(
model,
cachedMetrics,
new SessionCacheCallback(sessionCache, cachedMetrics)
));
nextWorker(cachedMetrics);
cachedMetrics.setLastUpdateTimestamp(timestamp);
} else {
metrics.calculate();
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
prepareRequests.add(
metricsDAO.prepareBatchInsert(
model,
metrics,
new SessionCacheCallback(sessionCache, metrics)
));
nextWorker(metrics);
metrics.setLastUpdateTimestamp(timestamp);
}
......@@ -263,7 +294,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
}
/**
* Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
* Load data from the storage, only load data when the id doesn't exist.
*/
private void loadFromStorage(List<Metrics> metrics) {
final long currentTimeMillis = System.currentTimeMillis();
......@@ -271,9 +302,9 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
List<Metrics> notInCacheMetrics =
metrics.stream()
.filter(m -> {
final Metrics cachedValue = context.get(m);
// Not cached or session disabled, the metric could be tagged `not in cache`.
if (cachedValue == null || !enableDatabaseSession) {
final Metrics cachedValue = sessionCache.get(m);
// the metric is tagged `not in cache`.
if (cachedValue == null) {
return true;
}
// The metric is in the cache, but still we have to check
......@@ -286,7 +317,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
if (metricsDAO.isExpiredCache(model, cachedValue, currentTimeMillis, metricsDataTTL)) {
// The expired metrics should be removed from the context and tagged `not in cache` directly.
context.remove(m);
sessionCache.remove(m);
return true;
}
}
......@@ -298,12 +329,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
return;
}
final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics);
if (!enableDatabaseSession) {
// Clear the cache only after results from DB are returned successfully.
context.clear();
}
dbMetrics.forEach(m -> context.put(m, m));
metricsDAO.multiGet(model, notInCacheMetrics).forEach(m -> sessionCache.put(m, m));
} catch (final Exception e) {
log.error("Failed to load metrics for merging", e);
}
......@@ -311,15 +337,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
@Override
public void endOfRound() {
if (enableDatabaseSession) {
Iterator<Metrics> iterator = context.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
Iterator<Metrics> iterator = sessionCache.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) {
Metrics metrics = iterator.next();
if (metrics.isExpired(timestamp, sessionTimeout)) {
iterator.remove();
}
if (metrics.isExpired(timestamp, sessionTimeout)) {
iterator.remove();
}
}
}
......
......@@ -76,12 +76,6 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
@Setter
@Getter
private long l1FlushPeriod = 500;
/**
* Hold and forward CoreModuleConfig#enableDatabaseSession to the persistent worker.
*/
@Setter
@Getter
private boolean enableDatabaseSession;
/**
* The threshold of session time. Unit is ms. Default value is 70s.
*/
......@@ -219,7 +213,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker,
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
persistentWorkers.add(minutePersistentWorker);
......@@ -233,7 +227,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricStreamKind kind) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO,
enableDatabaseSession, supportUpdate, storageSessionTimeout, metricsDataTTL, kind
supportUpdate, storageSessionTimeout, metricsDataTTL, kind
);
persistentWorkers.add(persistentWorker);
......
......@@ -47,7 +47,7 @@ public interface IMetricsDAO extends DAO {
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException;
InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Transfer the given metrics to an executable update statement.
......@@ -55,7 +55,7 @@ public interface IMetricsDAO extends DAO {
* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException;
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Calculate the expired status of the metric by given current timestamp, metric and TTL.
......@@ -72,4 +72,5 @@ public interface IMetricsDAO extends DAO {
// If the cached metric is older than the TTL indicated.
return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
/**
* SessionCacheCallback provides a bridge for storage implementations
*/
@RequiredArgsConstructor
public class SessionCacheCallback {
private final Map<Metrics, Metrics> sessionCache;
private final Metrics metrics;
/**
* In some cases, this callback could be shared by multiple executions, such as SQLExecutor#additionalSQLs.
* This flag would make sure, once one of the generated executions is failure, the whole metric would be removed
* from the cache, and would not be added back. As those are executed in a batch mode. The sequence is uncertain.
*/
private volatile boolean isFailed = false;
public void onInsertCompleted() {
if (isFailed) {
return;
}
sessionCache.put(metrics, metrics);
}
public void onUpdateFailure() {
isFailed = true;
sessionCache.remove(metrics);
}
}
......@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@Getter
public class IndexRequestWrapper implements InsertRequest {
private final IndexRequest request;
protected IndexRequest request;
public IndexRequestWrapper(String index, String type, String id,
Map<String, ?> source) {
......@@ -35,4 +35,15 @@ public class IndexRequestWrapper implements InsertRequest {
.doc(source)
.build();
}
/**
* Expose an empty constructor to lazy initialization.
*/
protected IndexRequestWrapper() {
}
@Override
public void onInsertCompleted() {
}
}
......@@ -23,7 +23,7 @@ import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
@Getter
public class UpdateRequestWrapper implements UpdateRequest {
private final org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
protected org.apache.skywalking.library.elasticsearch.requests.UpdateRequest request;
public UpdateRequestWrapper(String index, String type, String id,
Map<String, Object> source) {
......@@ -34,4 +34,16 @@ public class UpdateRequestWrapper implements UpdateRequest {
.doc(source)
.build();
}
/**
* Expose an empty constructor to lazy initialization.
*/
protected UpdateRequestWrapper() {
}
@Override
public void onUpdateFailure() {
}
}
......@@ -18,4 +18,5 @@
package org.apache.skywalking.oap.server.library.client.request;
public interface InsertRequest extends PrepareRequest {
void onInsertCompleted();
}
......@@ -18,4 +18,5 @@
package org.apache.skywalking.oap.server.library.client.request;
public interface UpdateRequest extends PrepareRequest {
void onUpdateFailure();
}
......@@ -102,9 +102,6 @@ core:
storageSessionTimeout: ${SW_CORE_STORAGE_SESSION_TIMEOUT:70000}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${SW_CORE_PERSISTENT_PERIOD:25}
# Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
# the metrics may not be accurate within that minute.
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute
# Extra model column are the column defined by in the codes, These columns of model are not required logically in aggregation or further query,
# and it will cause more load for memory, network of OAP and storage.
......
......@@ -18,6 +18,9 @@
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
......@@ -29,10 +32,6 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDB
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
private static final Object STREAM_SYNCHRONIZER = new Object();
private static final Object MEASURE_SYNCHRONIZER = new Object();
......@@ -69,7 +68,13 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
if (r instanceof BanyanDBStreamInsertRequest) {
return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
} else if (r instanceof BanyanDBMeasureInsertRequest) {
return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite())
.whenComplete((v, throwable) -> {
if (throwable == null) {
// Insert completed
((BanyanDBMeasureInsertRequest) r).onInsertCompleted();
}
});
} else if (r instanceof BanyanDBMeasureUpdateRequest) {
return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
}
......
......@@ -21,10 +21,17 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@RequiredArgsConstructor
@Getter
public class BanyanDBMeasureInsertRequest implements InsertRequest {
private final MeasureWrite measureWrite;
private final SessionCacheCallback callback;
@Override
public void onInsertCompleted() {
callback.onInsertCompleted();
}
}
\ No newline at end of file
......@@ -21,10 +21,18 @@ package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
@RequiredArgsConstructor
@Getter
public class BanyanDBMeasureUpdateRequest implements UpdateRequest {
private final MeasureWrite measureWrite;
@Override
public void onUpdateFailure() {
// BanyanDB measure update is equivalent to insert.
// If something goes wrong, then it is a code bug or server-side is not available
throw new UnexpectedException("Should not report onUpdateFailure when measure update.");
}
}
......@@ -26,6 +26,7 @@ import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
......@@ -77,7 +78,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
}
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
log.info("prepare to insert {}", model.getName());
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
if (schema == null) {
......@@ -89,11 +90,11 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD
final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite);
storageBuilder.entity2Storage(metrics, toStorage);
toStorage.acceptID(metrics.id());
return new BanyanDBMeasureInsertRequest(toStorage.obtain());
return new BanyanDBMeasureInsertRequest(toStorage.obtain(), callback);
}
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
log.info("prepare to update {}", model.getName());
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model);
if (schema == null) {
......
......@@ -27,4 +27,9 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
@Getter
public class BanyanDBStreamInsertRequest implements InsertRequest {
private final StreamWrite streamWrite;
@Override
public void onInsertCompleted() {
}
}
\ No newline at end of file
......@@ -75,9 +75,21 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
return CompletableFuture.allOf(prepareRequests.stream().map(prepareRequest -> {
if (prepareRequest instanceof InsertRequest) {
return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest());
return bulkProcessor.add(((IndexRequestWrapper) prepareRequest).getRequest())
.whenComplete((v, throwable) -> {
if (throwable == null) {
// Insert completed
((IndexRequestWrapper) prepareRequest).onInsertCompleted();
}
});
} else {
return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest());
return bulkProcessor.add(((UpdateRequestWrapper) prepareRequest).getRequest())
.whenComplete((v, throwable) -> {
if (throwable != null) {
// Update failure
((UpdateRequestWrapper) prepareRequest).onUpdateFailure();
}
});
}
}).toArray(CompletableFuture[]::new));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.elasticsearch.IndexRequestWrapper;
/**
* MetricIndexRequestWrapper wraps the built request wrapper with a new callback.
*/
public class MetricIndexRequestWrapper extends IndexRequestWrapper {
private final SessionCacheCallback callback;
public MetricIndexRequestWrapper(IndexRequestWrapper requestWrapper, SessionCacheCallback callback) {
this.request = requestWrapper.getRequest();
this.callback = callback;
}
@Override
public void onInsertCompleted() {
if (callback != null) {
callback.onInsertCompleted();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.elasticsearch.UpdateRequestWrapper;
/**
* MetricIndexUpdateWrapper wraps the built request wrapper with a new callback.
*/
public class MetricIndexUpdateWrapper extends UpdateRequestWrapper {
private final SessionCacheCallback callback;
public MetricIndexUpdateWrapper(UpdateRequestWrapper requestWrapper, SessionCacheCallback callback) {
this.request = requestWrapper.getRequest();
this.callback = callback;
}
@Override
public void onUpdateFailure() {
if (callback != null) {
callback.onUpdateFailure();
}
}
}
......@@ -32,6 +32,7 @@ import org.apache.skywalking.oap.server.core.analysis.DownSampling;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
......@@ -97,24 +98,24 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
}
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics) {
public InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) {
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder = IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareInsert(modelName, id, builder);
return new MetricIndexRequestWrapper(getClient().prepareInsert(modelName, id, builder), callback);
}
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) {
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) {
final ElasticSearchConverter.ToStorage toStorage = new ElasticSearchConverter.ToStorage(model.getName());
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> builder =
IndexController.INSTANCE.appendTableColumn(model, toStorage.obtain());
String modelName = TimeSeriesUtils.writeIndexName(model, metrics.getTimeBucket());
String id = IndexController.INSTANCE.generateDocId(model, metrics.id());
return getClient().prepareUpdate(modelName, id, builder);
return new MetricIndexUpdateWrapper(getClient().prepareUpdate(modelName, id, builder), callback);
}
@Override
......
......@@ -18,15 +18,17 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
/**
* A Batch SQL executor.
......@@ -45,32 +47,59 @@ public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
return;
}
String sql = prepareRequests.get(0).toString();
List<PrepareRequest> bulkRequest = new ArrayList<>(maxBatchSqlSize);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
int pendingCount = 0;
for (int k = 0; k < prepareRequests.size(); k++) {
SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
sqlExecutor.setParameters(preparedStatement);
preparedStatement.addBatch();
bulkRequest.add(sqlExecutor);
if (k > 0 && k % maxBatchSqlSize == 0) {
executeBatch(preparedStatement, maxBatchSqlSize, sql);
executeBatch(preparedStatement, maxBatchSqlSize, sql, bulkRequest);
bulkRequest.clear();
pendingCount = 0;
} else {
pendingCount++;
}
}
if (pendingCount > 0) {
executeBatch(preparedStatement, pendingCount, sql);
executeBatch(preparedStatement, pendingCount, sql, bulkRequest);
bulkRequest.clear();
}
}
}
private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
private void executeBatch(PreparedStatement preparedStatement,
int pendingCount,
String sql,
List<PrepareRequest> bulkRequest) throws SQLException {
long start = System.currentTimeMillis();
preparedStatement.executeBatch();
final int[] executeBatchResults = preparedStatement.executeBatch();
boolean isInsert = bulkRequest.get(0) instanceof InsertRequest;
for (int i = 0; i < executeBatchResults.length; i++) {
if (executeBatchResults[i] == 1 && isInsert) {
// Insert successfully.
((InsertRequest) bulkRequest.get(i)).onInsertCompleted();
} else if (executeBatchResults[i] == 0 && !isInsert) {
// Update Failure.
((UpdateRequest) bulkRequest.get(i)).onUpdateFailure();
}
}
if (log.isDebugEnabled()) {
long end = System.currentTimeMillis();
long cost = end - start;
log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
}
}
@Override
public void onInsertCompleted() {
throw new UnexpectedException("BatchSQLExecutor.onInsertCompleted should not be called");
}
@Override
public void onUpdateFailure() {
throw new UnexpectedException("BatchSQLExecutor.onUpdateFailure should not be called");
}
}
......@@ -25,34 +25,30 @@ import java.util.ArrayList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A SQL executor.
*/
@EqualsAndHashCode(of = "sql")
@RequiredArgsConstructor
@Slf4j
public class SQLExecutor implements InsertRequest, UpdateRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
private String sql;
private List<Object> param;
private final String sql;
private final List<Object> param;
private final SessionCacheCallback callback;
@Getter
private List<SQLExecutor> additionalSQLs;
public SQLExecutor(String sql, List<Object> param) {
this.sql = sql;
this.param = param;
}
public void invoke(Connection connection) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
setParameters(preparedStatement);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
if (log.isDebugEnabled()) {
log.debug("execute sql in batch: {}, parameters: {}", sql, param);
}
preparedStatement.execute();
if (additionalSQLs != null) {
......@@ -79,4 +75,16 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
}
additionalSQLs.addAll(sqlExecutors);
}
@Override
public void onInsertCompleted() {
if (callback != null)
callback.onInsertCompleted();
}
@Override
public void onUpdateFailure() {
if (callback != null)
callback.onUpdateFailure();
}
}
......@@ -48,7 +48,7 @@ public class JDBCManagementDAO extends JDBCSQLExecutor implements IManagementDAO
}
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), storageData, storageBuilder,
new HashMapConverter.ToStorage());
new HashMapConverter.ToStorage(), null);
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
......
......@@ -24,6 +24,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
......@@ -49,12 +50,12 @@ public class JDBCMetricsDAO extends JDBCSQLExecutor implements IMetricsDAO {
}
@Override
public SQLExecutor prepareBatchInsert(Model model, Metrics metrics) throws IOException {
return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage());
public SQLExecutor prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
return getInsertExecutor(model.getName(), metrics, storageBuilder, new HashMapConverter.ToStorage(), callback);
}
@Override
public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
return getUpdateExecutor(model.getName(), metrics, storageBuilder);
public SQLExecutor prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException {
return getUpdateExecutor(model.getName(), metrics, storageBuilder, callback);
}
}
......@@ -41,7 +41,7 @@ public class JDBCNoneStreamDAO extends JDBCSQLExecutor implements INoneStreamDAO
@Override
public void insert(Model model, NoneStream noneStream) throws IOException {
try (Connection connection = jdbcClient.getConnection()) {
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage());
SQLExecutor insertExecutor = getInsertExecutor(model.getName(), noneStream, storageBuilder, new HashMapConverter.ToStorage(), null);
insertExecutor.invoke(connection);
} catch (IOException | SQLException e) {
throw new IOException(e.getMessage(), e);
......
......@@ -35,6 +35,6 @@ public class JDBCRecordDAO extends JDBCSQLExecutor implements IRecordDAO {
@Override
public InsertRequest prepareBatchInsert(Model model, Record record) throws IOException {
return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage());
return getInsertExecutor(model.getName(), record, storageBuilder, new HashMapConverter.ToStorage(), null);
}
}
......@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.SessionCacheCallback;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
......@@ -116,7 +117,8 @@ public class JDBCSQLExecutor {
protected <T extends StorageData> SQLExecutor getInsertExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder,
Convert2Storage<Map<String, Object>> converter) throws IOException {
Convert2Storage<Map<String, Object>> converter,
SessionCacheCallback callback) throws IOException {
Model model = TableMetaInfo.get(modelName);
storageBuilder.entity2Storage(metrics, converter);
Map<String, Object> objectMap = converter.obtain();
......@@ -126,7 +128,7 @@ public class JDBCSQLExecutor {
mainEntity.put(column.getColumnName().getName(), objectMap.get(column.getColumnName().getName()));
});
SQLExecutor sqlExecutor = buildInsertExecutor(
modelName, model.getColumns(), metrics, mainEntity);
modelName, model.getColumns(), metrics, mainEntity, callback);
//build additional table sql
for (SQLDatabaseModelExtension.AdditionalTable additionalTable : model.getSqlDBModelExtension()
.getAdditionalTables()
......@@ -137,7 +139,7 @@ public class JDBCSQLExecutor {
});
List<SQLExecutor> additionalSQLExecutors = buildAdditionalInsertExecutor(
additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity
additionalTable.getName(), additionalTable.getColumns(), metrics, additionalEntity, callback
);
sqlExecutor.appendAdditionalSQLs(additionalSQLExecutors);
}
......@@ -147,7 +149,8 @@ public class JDBCSQLExecutor {
private <T extends StorageData> SQLExecutor buildInsertExecutor(String tableName,
List<ModelColumn> columns,
T metrics,
Map<String, Object> objectMap) throws IOException {
Map<String, Object> objectMap,
SessionCacheCallback onCompleteCallback) throws IOException {
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
List<Object> param = new ArrayList<>();
sqlBuilder.append("(?,");
......@@ -169,13 +172,14 @@ public class JDBCSQLExecutor {
}
sqlBuilder.append(")");
return new SQLExecutor(sqlBuilder.toString(), param);
return new SQLExecutor(sqlBuilder.toString(), param, onCompleteCallback);
}
private <T extends StorageData> List<SQLExecutor> buildAdditionalInsertExecutor(String tableName,
List<ModelColumn> columns,
T metrics,
Map<String, Object> objectMap) throws IOException {
Map<String, Object> objectMap,
SessionCacheCallback callback) throws IOException {
List<SQLExecutor> sqlExecutors = new ArrayList<>();
SQLBuilder sqlBuilder = new SQLBuilder("INSERT INTO " + tableName + " VALUES");
......@@ -211,17 +215,18 @@ public class JDBCSQLExecutor {
for (Object object : valueList) {
List<Object> paramCopy = new ArrayList<>(param);
paramCopy.set(position, object);
sqlExecutors.add(new SQLExecutor(sql, paramCopy));
sqlExecutors.add(new SQLExecutor(sql, paramCopy, callback));
}
} else {
sqlExecutors.add(new SQLExecutor(sql, param));
sqlExecutors.add(new SQLExecutor(sql, param, callback));
}
return sqlExecutors;
}
protected <T extends StorageData> SQLExecutor getUpdateExecutor(String modelName, T metrics,
StorageBuilder<T> storageBuilder) throws IOException {
StorageBuilder<T> storageBuilder,
SessionCacheCallback callback) throws IOException {
final HashMapConverter.ToStorage toStorage = new HashMapConverter.ToStorage();
storageBuilder.entity2Storage(metrics, toStorage);
Map<String, Object> objectMap = toStorage.obtain();
......@@ -236,7 +241,8 @@ public class JDBCSQLExecutor {
if (model.getSqlDBModelExtension().isShardingTable()) {
SQLDatabaseModelExtension.Sharding sharding = model.getSqlDBModelExtension().getSharding().orElseThrow(
() -> new UnexpectedException("Sharding should not be empty."));
if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(sharding.getTableShardingColumn())) {
if (columnName.equals(sharding.getDataSourceShardingColumn()) || columnName.equals(
sharding.getTableShardingColumn())) {
continue;
}
}
......@@ -253,6 +259,6 @@ public class JDBCSQLExecutor {
sqlBuilder.append(" WHERE id = ?");
param.add(metrics.id());
return new SQLExecutor(sqlBuilder.toString(), param);
return new SQLExecutor(sqlBuilder.toString(), param, callback);
}
}
......@@ -102,7 +102,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
public TemplateChangeStatus addTemplate(final DashboardSetting setting) throws IOException {
final UITemplate uiTemplate = setting.toEntity();
final SQLExecutor insertExecutor = getInsertExecutor(
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage());
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), new HashMapConverter.ToStorage(), null);
try (Connection connection = h2Client.getConnection()) {
insertExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(setting.getId()).build();
......@@ -135,7 +135,7 @@ public class JDBCUITemplateManagementDAO extends JDBCSQLExecutor implements UITe
private TemplateChangeStatus executeUpdate(final UITemplate uiTemplate) throws IOException {
final SQLExecutor updateExecutor = getUpdateExecutor(
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder());
UITemplate.INDEX_NAME, uiTemplate, new UITemplate.Builder(), null);
try (Connection connection = h2Client.getConnection()) {
updateExecutor.invoke(connection);
return TemplateChangeStatus.builder().status(true).id(uiTemplate.getTemplateId()).build();
......
......@@ -642,7 +642,7 @@ public class ShardingIntegrationTest {
.builder()
.getDeclaredConstructor()
.newInstance());
jdbcMetricsDAO.prepareBatchInsert(model, metrics).invoke(conn);
jdbcMetricsDAO.prepareBatchInsert(model, metrics, null).invoke(conn);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册