未验证 提交 224b6c6b 编写于 作者: N nicolchen 提交者: GitHub

Improve the speed of writing TiDB by batching the SQL execution (#7691)

上级 92f0ebf1
......@@ -66,6 +66,7 @@ Release Notes.
* Support gRPC sync grouped dynamic configurations.
* Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query.
* Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace.
* Improve the speed of writing TiDB by batching the SQL execution.
#### UI
......
......@@ -30,6 +30,8 @@ storage:
driver: org.h2.jdbcx.JdbcDataSource
url: jdbc:h2:mem:skywalking-oap-db
user: sa
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
```
## OpenSearch
......@@ -194,7 +196,7 @@ storage:
selector: ${SW_STORAGE:mysql}
mysql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
......@@ -202,9 +204,12 @@ storage:
dataSource.prepStmtCacheSqlLimit: ${SW_DATA_SOURCE_PREP_STMT_CACHE_SQL_LIMIT:2048}
dataSource.useServerPrepStmts: ${SW_DATA_SOURCE_USE_SERVER_PREP_STMTS:true}
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. See the [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
To understand the function of the parameter `rewriteBatchedStatements=true` in MySQL, see the [MySQL official document](https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-performance-extensions.html#cj-conn-prop_rewriteBatchedStatements).
## TiDB
Tested TiDB Server 4.0.8 version and MySQL Client driver 8.0.13 version are currently available.
......@@ -215,7 +220,7 @@ storage:
selector: ${SW_STORAGE:tidb}
tidb:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest"}
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
......@@ -226,9 +231,12 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
For details on settings, refer to the configuration of *MySQL* above.
To understand the function of the parameter `rewriteBatchedStatements=true` in TiDB, see the document of [TiDB best practices](https://docs.pingcap.com/tidb/stable/java-app-best-practices#use-batch-api).
## InfluxDB
InfluxDB storage provides a time-series database as a new storage option.
......@@ -266,6 +274,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
```
All connection-related settings, including URL link, username, and password are found in `application.yml`.
Only part of the settings are listed here. Please follow [HikariCP](https://github.com/brettwooldridge/HikariCP) connection pool document for full settings.
......
......@@ -114,16 +114,22 @@ core|default|role|Option values: `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_H2_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In H2, we use multiple physical columns to host the values: e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have the same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 100 |
| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 1 |
| - |mysql| - | MySQL Storage. The MySQL JDBC Driver is not in the dist. Please copy it into the oap-lib folder manually. | - | - |
| - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In MySQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5`. | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
| - |postgresql| - | PostgreSQL storage. | - | - |
| - | - | properties | Hikari connection pool configurations. | - | Listed in the `application.yaml`. |
| - | - | metadataQueryMaxSize | The maximum size of metadata per query. | SW_STORAGE_MYSQL_QUERY_MAX_SIZE | 5000 |
| - | - | maxSizeOfArrayColumn | Some entities (e.g. trace segments) include the logic column with multiple values. In PostgreSQL, we use multiple physical columns to host the values, e.g. change column_a with values [1,2,3,4,5] to `column_a_0 = 1, column_a_1 = 2, column_a_2 = 3 , column_a_3 = 4, column_a_4 = 5` | SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN | 20 |
| - | - | numOfSearchableValuesPerTag | In a trace segment, this includes multiple spans with multiple tags. Different spans may have same tag key, e.g. multiple HTTP exit spans all have their own `http.method` tags. This configuration sets the limit on the maximum number of values for the same tag key. | SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG | 2 |
| - | - | maxSizeOfBatchSql | The maximum size of batch size of SQL execution | SW_STORAGE_MAX_SIZE_OF_BATCH_SQL | 2000 |
| - | - | asyncBatchPersistentPoolSize | async flush data into database thread size | SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE | 4 |
| - |influxdb| - | InfluxDB storage. |- | - |
| - | - | url| InfluxDB connection URL. | SW_STORAGE_INFLUXDB_URL | http://localhost:8086|
| - | - | user | User name of InfluxDB. | SW_STORAGE_INFLUXDB_USER | root|
......
......@@ -158,9 +158,11 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_H2_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:100}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:1}
mysql:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest"}
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:root@1234}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
......@@ -170,9 +172,11 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
tidb:
properties:
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest"}
jdbcUrl: ${SW_JDBC_URL:"jdbc:mysql://localhost:4000/tidbswtest?rewriteBatchedStatements=true"}
dataSource.user: ${SW_DATA_SOURCE_USER:root}
dataSource.password: ${SW_DATA_SOURCE_PASSWORD:""}
dataSource.cachePrepStmts: ${SW_DATA_SOURCE_CACHE_PREP_STMTS:true}
......@@ -183,6 +187,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
influxdb:
# InfluxDB configuration
url: ${SW_STORAGE_INFLUXDB_URL:http://localhost:8086}
......@@ -206,6 +212,8 @@ storage:
metadataQueryMaxSize: ${SW_STORAGE_MYSQL_QUERY_MAX_SIZE:5000}
maxSizeOfArrayColumn: ${SW_STORAGE_MAX_SIZE_OF_ARRAY_COLUMN:20}
numOfSearchableValuesPerTag: ${SW_STORAGE_NUM_OF_SEARCHABLE_VALUES_PER_TAG:2}
maxSizeOfBatchSql: ${SW_STORAGE_MAX_SIZE_OF_BATCH_SQL:2000}
asyncBatchPersistentPoolSize: ${SW_STORAGE_ASYNC_BATCH_PERSISTENT_POOL_SIZE:4}
zipkin-elasticsearch:
namespace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
......
......@@ -49,7 +49,7 @@ public class ApplicationConfigLoaderTestCase {
assertThat(providerConfig.get("metadataQueryMaxSize"), is(5000));
assertThat(providerConfig.get("properties"), instanceOf(Properties.class));
Properties properties = (Properties) providerConfig.get("properties");
assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest"));
assertThat(properties.get("jdbcUrl"), is("jdbc:mysql://localhost:3306/swtest?rewriteBatchedStatements=true"));
}
@Test
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.jdbc;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
/**
* A Batch SQL executor.
*/
@Slf4j
@RequiredArgsConstructor
public class BatchSQLExecutor implements InsertRequest, UpdateRequest {
private final List<PrepareRequest> prepareRequests;
public void invoke(Connection connection, int maxBatchSqlSize) throws SQLException {
if (log.isDebugEnabled()) {
log.debug("execute sql batch. sql by key size: {}", prepareRequests.size());
}
if (prepareRequests.size() == 0) {
return;
}
String sql = prepareRequests.get(0).toString();
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
int pendingCount = 0;
for (int k = 0; k < prepareRequests.size(); k++) {
SQLExecutor sqlExecutor = (SQLExecutor) prepareRequests.get(k);
sqlExecutor.setParameters(preparedStatement);
preparedStatement.addBatch();
if (k > 0 && k % maxBatchSqlSize == 0) {
executeBatch(preparedStatement, maxBatchSqlSize, sql);
pendingCount = 0;
} else {
pendingCount++;
}
}
if (pendingCount > 0) {
executeBatch(preparedStatement, pendingCount, sql);
}
}
}
private void executeBatch(PreparedStatement preparedStatement, int pendingCount, String sql) throws SQLException {
long start = System.currentTimeMillis();
preparedStatement.executeBatch();
if (log.isDebugEnabled()) {
long end = System.currentTimeMillis();
long cost = end - start;
log.debug("execute batch sql, batch size: {}, cost:{}ms, sql: {}", pendingCount, cost, sql);
}
}
}
......@@ -22,6 +22,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import lombok.EqualsAndHashCode;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.slf4j.Logger;
......@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* A SQL executor.
*/
@EqualsAndHashCode(of = "sql")
public class SQLExecutor implements InsertRequest, UpdateRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(SQLExecutor.class);
......@@ -44,14 +46,21 @@ public class SQLExecutor implements InsertRequest, UpdateRequest {
public void invoke(Connection connection) throws SQLException {
PreparedStatement preparedStatement = connection.prepareStatement(sql);
setParameters(preparedStatement);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
}
preparedStatement.execute();
}
public void setParameters(PreparedStatement preparedStatement) throws SQLException {
for (int i = 0; i < param.size(); i++) {
preparedStatement.setObject(i + 1, param.get(i));
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("execute sql in batch: {}, parameters: {}", sql, param);
}
preparedStatement.execute();
@Override
public String toString() {
return sql;
}
}
......@@ -59,4 +59,16 @@ public class H2StorageConfig extends ModuleConfig {
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
/**
* The maximum size of batch size of SQL execution
*
* @since 8.8.0
*/
private int maxSizeOfBatchSql = 100;
/**
* async batch execute pool size
*
* @since 8.8.0
*/
private int asyncBatchPersistentPoolSize = 1;
}
......@@ -117,7 +117,7 @@ public class H2StorageProvider extends ModuleProvider {
settings.setProperty("dataSource.password", config.getPassword());
h2Client = new JDBCHikariCPClient(settings);
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
......
......@@ -22,38 +22,35 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.BatchSQLExecutor;
@Slf4j
public class H2BatchDAO implements IBatchDAO {
private JDBCHikariCPClient h2Client;
private final DataCarrier<PrepareRequest> dataCarrier;
private final int maxBatchSqlSize;
public H2BatchDAO(JDBCHikariCPClient h2Client) {
public H2BatchDAO(JDBCHikariCPClient h2Client, int maxBatchSqlSize, int asyncBatchPersistentPoolSize) {
this.h2Client = h2Client;
String name = "H2_ASYNCHRONOUS_BATCH_PERSISTENT";
BulkConsumePool.Creator creator = new BulkConsumePool.Creator(name, 1, 20);
try {
ConsumerPoolFactory.INSTANCE.createIfAbsent(name, creator);
} catch (Exception e) {
throw new UnexpectedException(e.getMessage(), e);
if (log.isDebugEnabled()) {
log.debug("H2_ASYNCHRONOUS_BATCH_PERSISTENT poolSize: {}, maxBatchSqlSize:{}", asyncBatchPersistentPoolSize, maxBatchSqlSize);
}
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new H2BatchDAO.H2BatchConsumer(this));
this.maxBatchSqlSize = maxBatchSqlSize;
this.dataCarrier = new DataCarrier<>(name, asyncBatchPersistentPoolSize, 10000);
this.dataCarrier.consume(new H2BatchDAO.H2BatchConsumer(this), asyncBatchPersistentPoolSize, 20);
}
@Override
......@@ -61,23 +58,27 @@ public class H2BatchDAO implements IBatchDAO {
if (CollectionUtils.isEmpty(prepareRequests)) {
return;
}
if (log.isDebugEnabled()) {
log.debug("batch sql statements execute, data size: {}", prepareRequests.size());
log.debug("to execute sql statements execute, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
}
final Map<PrepareRequest, List<PrepareRequest>> batchRequestMap =
prepareRequests.stream().collect(Collectors.groupingBy(Function.identity()));
try (Connection connection = h2Client.getConnection()) {
for (PrepareRequest prepareRequest : prepareRequests) {
batchRequestMap.forEach((key, requests) -> {
try {
SQLExecutor sqlExecutor = (SQLExecutor) prepareRequest;
sqlExecutor.invoke(connection);
BatchSQLExecutor batchSQLExecutor =
new BatchSQLExecutor(requests);
batchSQLExecutor.invoke(connection, maxBatchSqlSize);
} catch (SQLException e) {
// Just avoid one execution failure makes the rest of batch failure.
log.error(e.getMessage(), e);
}
}
});
} catch (SQLException | JDBCClientException e) {
log.error(e.getMessage(), e);
log.warn("execute sql failed, discard data size: {}", prepareRequests.size(), e);
}
if (log.isDebugEnabled()) {
log.debug("execute sql statements done, data size: {}, maxBatchSqlSize: {}", prepareRequests.size(), maxBatchSqlSize);
}
}
......
......@@ -39,5 +39,17 @@ public class MySQLStorageConfig extends ModuleConfig {
* @since 8.2.0
*/
private int numOfSearchableValuesPerTag = 2;
/**
* The maximum size of batch size of SQL execution
*
* @since 8.8.0
*/
private int maxSizeOfBatchSql = 2000;
/**
* async batch execute pool size
*
* @since 8.8.0
*/
private int asyncBatchPersistentPoolSize = 4;
private Properties properties;
}
......@@ -103,7 +103,7 @@ public class MySQLStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
......
......@@ -25,5 +25,4 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql.MySQLStorageCo
@Setter
@Getter
public class PostgreSQLStorageConfig extends MySQLStorageConfig {
}
......@@ -103,7 +103,7 @@ public class PostgreSQLStorageProvider extends ModuleProvider {
postgresqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient));
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(postgresqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
......
......@@ -106,7 +106,7 @@ public class TiDBStorageProvider extends ModuleProvider {
mysqlClient = new JDBCHikariCPClient(config.getProperties());
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient, config.getMaxSizeOfBatchSql(), config.getAsyncBatchPersistentPoolSize()));
this.registerServiceImplementation(
StorageDAO.class,
new H2StorageDAO(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册