未验证 提交 ed507b78 编写于 作者: 彭勇升 pengys 提交者: GitHub

Dirty-Read in concurrency (#3117)

* Fixed the problem of dirty reading of the storage implementation of elasticsearch.

* Fixed a mistake.
上级 d2f52f28
...@@ -74,7 +74,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat ...@@ -74,7 +74,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
} }
@Override void onWork(Metrics metrics) { @Override void onWork(Metrics metrics) {
super.onWork(metrics); cacheData(metrics);
} }
@Override public void in(Metrics metrics) { @Override public void in(Metrics metrics) {
......
...@@ -48,7 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends ...@@ -48,7 +48,7 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
getCache().switchPointer(); getCache().switchPointer();
List<?> collection = buildBatchCollection(); List<?> collection = buildBatchCollection();
batchDAO.batchPersistence(collection); batchDAO.asynchronous(collection);
} }
} finally { } finally {
getCache().trySwitchPointerFinally(); getCache().trySwitchPointerFinally();
......
...@@ -25,5 +25,7 @@ import java.util.List; ...@@ -25,5 +25,7 @@ import java.util.List;
*/ */
public interface IBatchDAO extends DAO { public interface IBatchDAO extends DAO {
void batchPersistence(List<?> batchCollection); void asynchronous(List<?> collection);
void synchronous(List<?> collection);
} }
...@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.util.RunnableWithExceptionProtection; ...@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig; import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.*; import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*; import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*; import org.slf4j.*;
...@@ -77,7 +78,8 @@ public enum PersistenceTimer { ...@@ -77,7 +78,8 @@ public enum PersistenceTimer {
try { try {
HistogramMetrics.Timer timer = prepareLatency.createTimer(); HistogramMetrics.Timer timer = prepareLatency.createTimer();
List batchAllCollection = new LinkedList(); List records = new LinkedList();
List metrics = new LinkedList();
try { try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>(); List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers()); persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
...@@ -95,7 +97,16 @@ public enum PersistenceTimer { ...@@ -95,7 +97,16 @@ public enum PersistenceTimer {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size()); logger.debug("extract {} worker data size: {}", worker.getClass().getName(), batchCollection.size());
} }
batchAllCollection.addAll(batchCollection);
if (worker instanceof RecordPersistentWorker) {
records.addAll(batchCollection);
} else if (worker instanceof MetricsPersistentWorker) {
metrics.addAll(batchCollection);
} else if (worker instanceof TopNWorker) {
records.addAll(batchCollection);
} else {
logger.error("Missing the worker {}", worker.getClass().getSimpleName());
}
} }
}); });
...@@ -108,7 +119,12 @@ public enum PersistenceTimer { ...@@ -108,7 +119,12 @@ public enum PersistenceTimer {
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer(); HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try { try {
batchDAO.batchPersistence(batchAllCollection); if (CollectionUtils.isNotEmpty(records)) {
batchDAO.asynchronous(records);
}
if (CollectionUtils.isNotEmpty(metrics)) {
batchDAO.synchronous(metrics);
}
} finally { } finally {
executeLatencyTimer.finish(); executeLatencyTimer.finish();
} }
...@@ -117,12 +133,12 @@ public enum PersistenceTimer { ...@@ -117,12 +133,12 @@ public enum PersistenceTimer {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} finally { } finally {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("persistence data save finish"); logger.debug("Persistence data save finish");
} }
} }
if (debug) { if (debug) {
logger.info("batch persistence duration: {} ms", System.currentTimeMillis() - startTime); logger.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
} }
} }
} }
...@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch; ...@@ -21,7 +21,6 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.gson.*; import com.google.gson.*;
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.http.*; import org.apache.http.*;
import org.apache.http.auth.*; import org.apache.http.auth.*;
...@@ -38,7 +37,7 @@ import org.elasticsearch.action.bulk.*; ...@@ -38,7 +37,7 @@ import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.get.*; import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.*; import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.*;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.*; import org.elasticsearch.client.*;
import org.elasticsearch.common.unit.*; import org.elasticsearch.common.unit.*;
...@@ -60,7 +59,7 @@ public class ElasticSearchClient implements Client { ...@@ -60,7 +59,7 @@ public class ElasticSearchClient implements Client {
private final String namespace; private final String namespace;
private final String user; private final String user;
private final String password; private final String password;
private RestHighLevelClient client; protected RestHighLevelClient client;
public ElasticSearchClient(String clusterNodes, String namespace, String user, String password) { public ElasticSearchClient(String clusterNodes, String namespace, String user, String password) {
this.clusterNodes = clusterNodes; this.clusterNodes = clusterNodes;
...@@ -130,7 +129,7 @@ public class ElasticSearchClient implements Client { ...@@ -130,7 +129,7 @@ public class ElasticSearchClient implements Client {
logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson); logger.debug("retrieval indexes by aliases {}, response is {}", aliases, responseJson);
return new ArrayList<>(responseJson.keySet()); return new ArrayList<>(responseJson.keySet());
} }
return Collections.EMPTY_LIST; return Collections.emptyList();
} }
/** /**
...@@ -138,7 +137,6 @@ public class ElasticSearchClient implements Client { ...@@ -138,7 +137,6 @@ public class ElasticSearchClient implements Client {
* Then you should delete the index by this method, this method will no longer concatenate namespace. * Then you should delete the index by this method, this method will no longer concatenate namespace.
* *
* https://github.com/apache/skywalking/pull/3017 * https://github.com/apache/skywalking/pull/3017
*
*/ */
public boolean deleteByIndexName(String indexName) throws IOException { public boolean deleteByIndexName(String indexName) throws IOException {
return deleteIndex(indexName, false); return deleteIndex(indexName, false);
...@@ -149,7 +147,6 @@ public class ElasticSearchClient implements Client { ...@@ -149,7 +147,6 @@ public class ElasticSearchClient implements Client {
* Then you should delete the index by this method, this method automatically concatenates namespace. * Then you should delete the index by this method, this method automatically concatenates namespace.
* *
* https://github.com/apache/skywalking/pull/3017 * https://github.com/apache/skywalking/pull/3017
*
*/ */
public boolean deleteByModelName(String modelName) throws IOException { public boolean deleteByModelName(String modelName) throws IOException {
return deleteIndex(modelName, true); return deleteIndex(modelName, true);
...@@ -302,11 +299,17 @@ public class ElasticSearchClient implements Client { ...@@ -302,11 +299,17 @@ public class ElasticSearchClient implements Client {
return response.getStatusLine().getStatusCode(); return response.getStatusLine().getStatusCode();
} }
public String formatIndexName(String indexName) { public void synchronousBulk(BulkRequest request) {
if (StringUtils.isNotEmpty(namespace)) { request.timeout(TimeValue.timeValueMinutes(2));
return namespace + "_" + indexName; request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.waitForActiveShards(ActiveShardCount.ONE);
try {
int size = request.requests().size();
BulkResponse responses = client.bulk(request);
logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
logger.error(e.getMessage(), e);
} }
return indexName;
} }
public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) { public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval, int concurrentRequests) {
...@@ -340,4 +343,11 @@ public class ElasticSearchClient implements Client { ...@@ -340,4 +343,11 @@ public class ElasticSearchClient implements Client {
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build(); .build();
} }
public String formatIndexName(String indexName) {
if (StringUtils.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
}
return indexName;
}
} }
...@@ -28,13 +28,14 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; ...@@ -28,13 +28,14 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
public class StorageModuleElasticsearchConfig extends ModuleConfig { public class StorageModuleElasticsearchConfig extends ModuleConfig {
@Setter private String nameSpace; @Setter private String nameSpace;
@Setter private String clusterNodes; @Setter private String clusterNodes;
@Setter private int indexShardsNumber; @Setter private int indexShardsNumber = 2;
@Setter private int indexReplicasNumber; @Setter private int indexReplicasNumber = 0;
@Setter private boolean highPerformanceMode; @Setter private int indexRefreshInterval = 2;
@Setter private int bulkActions = 2000; @Setter private int bulkActions = 2000;
@Setter private int bulkSize = 20; @Setter private int bulkSize = 20;
@Setter private int flushInterval = 10; @Setter private int flushInterval = 10;
@Setter private int concurrentRequests = 2; @Setter private int concurrentRequests = 2;
@Setter private int syncBulkActions = 3;
@Setter private String user; @Setter private String user;
@Setter private String password; @Setter private String password;
@Setter private int metadataQueryMaxSize = 5000; @Setter private int metadataQueryMaxSize = 5000;
......
...@@ -95,7 +95,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider { ...@@ -95,7 +95,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
try { try {
elasticSearchClient.connect(); elasticSearchClient.connect();
StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber()); StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.getIndexRefreshInterval());
installer.install(elasticSearchClient); installer.install(elasticSearchClient);
RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient); RegisterLockInstaller lockInstaller = new RegisterLockInstaller(elasticSearchClient);
......
...@@ -22,7 +22,7 @@ import java.util.List; ...@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.slf4j.*; import org.slf4j.*;
...@@ -49,17 +49,17 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { ...@@ -49,17 +49,17 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
this.concurrentRequests = concurrentRequests; this.concurrentRequests = concurrentRequests;
} }
@Override public void batchPersistence(List<?> batchCollection) { @Override public void asynchronous(List<?> collection) {
if (bulkProcessor == null) { if (bulkProcessor == null) {
this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests); this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("bulk data size: {}", batchCollection.size()); logger.debug("Asynchronous batch persistent data collection size: {}", collection.size());
} }
if (CollectionUtils.isNotEmpty(batchCollection)) { if (CollectionUtils.isNotEmpty(collection)) {
batchCollection.forEach(builder -> { collection.forEach(builder -> {
if (builder instanceof IndexRequest) { if (builder instanceof IndexRequest) {
this.bulkProcessor.add((IndexRequest)builder); this.bulkProcessor.add((IndexRequest)builder);
} }
...@@ -67,8 +67,23 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO { ...@@ -67,8 +67,23 @@ public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
this.bulkProcessor.add((UpdateRequest)builder); this.bulkProcessor.add((UpdateRequest)builder);
} }
}); });
this.bulkProcessor.flush();
}
} }
this.bulkProcessor.flush(); @Override public void synchronous(List<?> collection) {
if (CollectionUtils.isNotEmpty(collection)) {
BulkRequest request = new BulkRequest();
for (Object builder : collection) {
if (builder instanceof IndexRequest) {
request.add((IndexRequest)builder);
}
if (builder instanceof UpdateRequest) {
request.add((UpdateRequest)builder);
}
}
getClient().synchronousBulk(request);
}
} }
} }
...@@ -58,7 +58,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> { ...@@ -58,7 +58,7 @@ public abstract class EsDAO extends AbstractDAO<ElasticSearchClient> {
sourceBuilder.size(0); sourceBuilder.size(0);
} }
XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException { protected XContentBuilder map2builder(Map<String, Object> objectMap) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (String key : objectMap.keySet()) { for (String key : objectMap.keySet()) {
Object value = objectMap.get(key); Object value = objectMap.get(key);
......
...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.model.*; ...@@ -25,6 +25,7 @@ import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.*; import org.slf4j.*;
/** /**
...@@ -36,12 +37,14 @@ public class StorageEsInstaller extends ModelInstaller { ...@@ -36,12 +37,14 @@ public class StorageEsInstaller extends ModelInstaller {
private final int indexShardsNumber; private final int indexShardsNumber;
private final int indexReplicasNumber; private final int indexReplicasNumber;
private final int indexRefreshInterval;
private final ColumnTypeEsMapping columnTypeEsMapping; private final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) { public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber, int indexRefreshInterval) {
super(moduleManager); super(moduleManager);
this.indexShardsNumber = indexShardsNumber; this.indexShardsNumber = indexShardsNumber;
this.indexReplicasNumber = indexReplicasNumber; this.indexReplicasNumber = indexReplicasNumber;
this.indexRefreshInterval = indexRefreshInterval;
this.columnTypeEsMapping = new ColumnTypeEsMapping(); this.columnTypeEsMapping = new ColumnTypeEsMapping();
} }
...@@ -98,8 +101,9 @@ public class StorageEsInstaller extends ModelInstaller { ...@@ -98,8 +101,9 @@ public class StorageEsInstaller extends ModelInstaller {
JsonObject setting = new JsonObject(); JsonObject setting = new JsonObject();
setting.addProperty("index.number_of_shards", indexShardsNumber); setting.addProperty("index.number_of_shards", indexShardsNumber);
setting.addProperty("index.number_of_replicas", indexReplicasNumber); setting.addProperty("index.number_of_replicas", indexReplicasNumber);
setting.addProperty("index.refresh_interval", "3s"); setting.addProperty("index.refresh_interval", TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop"); setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
TimeValue.timeValueSeconds(3);
return setting; return setting;
} }
......
...@@ -18,20 +18,20 @@ ...@@ -18,20 +18,20 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao; package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.Connection; import java.sql.*;
import java.sql.SQLException;
import java.util.List; import java.util.List;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO; import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException; import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient; import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor; import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLExecutor;
import org.slf4j.Logger; import org.slf4j.*;
import org.slf4j.LoggerFactory;
/** /**
* @author wusheng * @author wusheng, peng-yongsheng
*/ */
public class H2BatchDAO implements IBatchDAO { public class H2BatchDAO implements IBatchDAO {
private static final Logger logger = LoggerFactory.getLogger(H2BatchDAO.class); private static final Logger logger = LoggerFactory.getLogger(H2BatchDAO.class);
private JDBCHikariCPClient h2Client; private JDBCHikariCPClient h2Client;
...@@ -40,24 +40,26 @@ public class H2BatchDAO implements IBatchDAO { ...@@ -40,24 +40,26 @@ public class H2BatchDAO implements IBatchDAO {
this.h2Client = h2Client; this.h2Client = h2Client;
} }
@Override public void batchPersistence(List<?> batchCollection) { @Override public void synchronous(List<?> collection) {
if (batchCollection.size() == 0) { if (CollectionUtils.isEmpty(collection)) {
return; return;
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("batch sql statements execute, data size: {}", batchCollection.size()); logger.debug("batch sql statements execute, data size: {}", collection.size());
} }
try (Connection connection = h2Client.getConnection()) { try (Connection connection = h2Client.getConnection()) {
for (Object exe : batchCollection) { for (Object exe : collection) {
SQLExecutor sqlExecutor = (SQLExecutor)exe; SQLExecutor sqlExecutor = (SQLExecutor)exe;
sqlExecutor.invoke(connection); sqlExecutor.invoke(connection);
} }
} catch (SQLException e) { } catch (SQLException | JDBCClientException e) {
logger.error(e.getMessage(), e);
} catch (JDBCClientException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
} }
@Override public void asynchronous(List<?> collection) {
synchronous(collection);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册