From 06165a0359c38c655e71ca8c29c9c06ccd4aec53 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com>
Date: Fri, 3 Aug 2018 12:01:53 +0800
Subject: [PATCH] Feature/oap/storage (#1516)
* Storage and Persistence.
* Storage config.
* Fixed the CI failure.
---
.../collector/storage/base/dao/IBatchDAO.java | 1 -
oap-server/pom.xml | 4 +-
.../oap/server/core/CoreModuleProvider.java | 4 +-
.../oap/server/core/UnexpectedException.java | 28 +++
.../EndpointLatencyAvgAggregateWorker.java | 10 +-
.../endpoint/EndpointLatencyAvgIndicator.java | 56 ++++-
.../EndpointLatencyAvgPersistentWorker.java | 4 +
.../core/analysis/indicator/AvgIndicator.java | 12 +-
.../core/analysis/indicator/Indicator.java | 14 +-
.../indicator/annotation/IndicatorType.java | 2 +
.../indicator/define/IndicatorMapper.java | 4 +
.../worker/AbstractAggregatorWorker.java | 15 +-
.../worker/AbstractPersistentWorker.java | 101 +++++++++
.../analysis/worker/define/WorkerMapper.java | 6 +-
.../oap/server/core/receiver/Endpoint.java | 4 +
.../core/receiver/annotation/SourceType.java | 29 +++
.../oap/server/core/storage/AbstractDAO.java | 36 ++++
.../oap/server/core/storage/DAO.java | 27 +++
.../oap/server/core/storage/IBatchDAO.java | 29 +++
.../server/core/storage/IPersistenceDAO.java | 36 ++++
.../server/core/storage/StorageException.java | 29 +++
.../server/core/storage/StorageInstaller.java | 81 ++++++++
.../server/core/storage/StorageModule.java | 2 +-
.../core/storage/annotation/Column.java | 30 +++
.../annotation/ColumnAnnotationRetrieval.java | 60 ++++++
.../core/storage/define/ColumnDefine.java | 40 ++++
.../core/storage/define/ColumnName.java | 41 ++++
.../storage/define/ColumnTypeMapping.java | 27 +++
.../core/storage/define/TableDefine.java | 42 ++++
.../indicator/define/TestAvgIndicator.java | 19 +-
.../storage/StorageInstallerTestCase.java | 77 +++++++
.../server-library/library-client/pom.xml | 2 +-
.../elasticsearch/ElasticSearchClient.java | 194 ++++++++++++++++++
.../ElasticSearchClientException.java | 35 ++++
.../ElasticSearchClientTestCase.java | 62 ++++++
.../server/library/module/ModuleDefine.java | 8 +-
.../src/main/resources/application.yml | 14 ++
.../storage-elasticsearch-plugin/pom.xml | 5 +
.../StorageModuleElasticsearchConfig.java | 3 +
.../StorageModuleElasticsearchProvider.java | 29 ++-
.../elasticsearch/base/BatchProcessEsDAO.java | 72 +++++++
.../base/ColumnTypeEsMapping.java | 41 ++++
.../plugin/elasticsearch/base/EsDAO.java | 71 +++++++
.../elasticsearch/base/PersistenceEsDAO.java | 79 +++++++
.../base/StorageEsInstaller.java | 133 ++++++++++++
...lasticSearchColumnTypeMappingTestCase.java | 43 ++++
46 files changed, 1622 insertions(+), 39 deletions(-)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/receiver/annotation/SourceType.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/AbstractDAO.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/DAO.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IBatchDAO.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IPersistenceDAO.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageException.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageInstaller.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/Column.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/ColumnAnnotationRetrieval.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnDefine.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnName.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/ColumnTypeMapping.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/define/TableDefine.java
create mode 100644 oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/StorageInstallerTestCase.java
create mode 100644 oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
create mode 100644 oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
create mode 100644 oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
create mode 100644 oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
index 7dcf14f6f..2b1e335e3 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/base/dao/IBatchDAO.java
@@ -16,7 +16,6 @@
*
*/
-
package org.apache.skywalking.apm.collector.storage.base.dao;
import java.util.List;
diff --git a/oap-server/pom.xml b/oap-server/pom.xml
index 65360c14c..c34718b9f 100644
--- a/oap-server/pom.xml
+++ b/oap-server/pom.xml
@@ -55,7 +55,7 @@
1.4.196
2.0.3
1.4
- 6.3.1
+ 6.3.2
2.9.9
2.0.0
@@ -142,7 +142,7 @@
org.elasticsearch.client
- elasticsearch-rest-client
+ elasticsearch-rest-high-level-client
${elasticsearch.version}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 757c34a76..d116ae835 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -48,7 +48,7 @@ public class CoreModuleProvider extends ModuleProvider {
super();
this.moduleConfig = new CoreModuleConfig();
this.indicatorMapper = new IndicatorMapper();
- this.workerMapper = new WorkerMapper(getManager());
+ this.workerMapper = new WorkerMapper();
}
@Override public String name() {
@@ -87,7 +87,7 @@ public class CoreModuleProvider extends ModuleProvider {
try {
indicatorMapper.load();
- workerMapper.load();
+ workerMapper.load(getManager());
} catch (IndicatorDefineLoadException | WorkerDefineLoadException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
new file mode 100644
index 000000000..f290fd059
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/UnexpectedException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core;
+
+/**
+ * @author wu-sheng
+ */
+public class UnexpectedException extends RuntimeException {
+ public UnexpectedException(String message) {
+ super(message);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
index 62d51341e..f9177fe57 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregateWorker.java
@@ -20,23 +20,17 @@ package org.apache.skywalking.oap.server.core.analysis.endpoint;
import org.apache.skywalking.oap.server.core.analysis.worker.AbstractAggregatorWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgAggregateWorker extends AbstractAggregatorWorker {
- private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregateWorker.class);
-
- private final EndpointLatencyAvgRemoteWorker remoter;
-
public EndpointLatencyAvgAggregateWorker(ModuleManager moduleManager) {
super(moduleManager);
- this.remoter = new EndpointLatencyAvgRemoteWorker(moduleManager);
}
- @Override protected void onNext(EndpointLatencyAvgIndicator data) {
- remoter.in(data);
+ @Override public Class nextWorkerClass() {
+ return EndpointLatencyAvgRemoteWorker.class;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index e339afa73..7054f3d8b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,16 +18,33 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
+import java.util.*;
import lombok.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
+import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
public class EndpointLatencyAvgIndicator extends AvgIndicator {
- @Setter @Getter private int id;
+ private static final String NAME = "endpoint_latency_avg";
+ private static final String ID = "id";
+ private static final String SERVICE_ID = "service_id";
+ private static final String SERVICE_INSTANCE_ID = "service_instance_id";
+
+ @Setter @Getter @Column(columnName = ID) private int id;
+ @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
+ @Setter @Getter @Column(columnName = SERVICE_INSTANCE_ID) private int serviceInstanceId;
+
+ @Override public String name() {
+ return NAME;
+ }
+
+ @Override public String id() {
+ return String.valueOf(id);
+ }
@Override public int hashCode() {
int result = 17;
@@ -56,18 +73,49 @@ public class EndpointLatencyAvgIndicator extends AvgIndicator {
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getId());
- remoteBuilder.setDataIntegers(1, getCount());
+ remoteBuilder.setDataIntegers(1, getServiceId());
+ remoteBuilder.setDataIntegers(2, getServiceInstanceId());
+ remoteBuilder.setDataIntegers(3, getCount());
remoteBuilder.setDataLongs(0, getTimeBucket());
remoteBuilder.setDataLongs(1, getSummation());
+ remoteBuilder.setDataLongs(2, getValue());
+
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setId(remoteData.getDataIntegers(0));
- setCount(remoteData.getDataIntegers(1));
+ setServiceId(remoteData.getDataIntegers(1));
+ setServiceInstanceId(remoteData.getDataIntegers(2));
+ setCount(remoteData.getDataIntegers(3));
setTimeBucket(remoteData.getDataLongs(0));
setSummation(remoteData.getDataLongs(1));
+ setValue(remoteData.getDataLongs(2));
+ }
+
+ @Override public Map toMap() {
+ Map map = new HashMap<>();
+ map.put(ID, id);
+ map.put(SERVICE_ID, serviceId);
+ map.put(SERVICE_INSTANCE_ID, serviceInstanceId);
+ map.put(COUNT, getCount());
+ map.put(SUMMATION, getSummation());
+ map.put(VALUE, getValue());
+ map.put(TIME_BUCKET, getTimeBucket());
+ return map;
+ }
+
+ @Override public Indicator newOne(Map dbMap) {
+ EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator();
+ indicator.setId((Integer)dbMap.get(ID));
+ indicator.setServiceId((Integer)dbMap.get(SERVICE_ID));
+ indicator.setServiceInstanceId((Integer)dbMap.get(SERVICE_INSTANCE_ID));
+ indicator.setCount((Integer)dbMap.get(COUNT));
+ indicator.setSummation((Long)dbMap.get(SUMMATION));
+ indicator.setValue((Long)dbMap.get(VALUE));
+ indicator.setTimeBucket((Long)dbMap.get(TIME_BUCKET));
+ return indicator;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
index e3c5c2306..288d568fc 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgPersistentWorker.java
@@ -29,4 +29,8 @@ public class EndpointLatencyAvgPersistentWorker extends AbstractPersistentWorker
public EndpointLatencyAvgPersistentWorker(ModuleManager moduleManager) {
super(moduleManager);
}
+
+ @Override protected boolean needMergeDBData() {
+ return true;
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
index da065f37f..1bae8ec81 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -21,15 +21,21 @@ package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.remote.selector.Selector;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
-@IndicatorType(selector = Selector.HashCode)
+@IndicatorType(selector = Selector.HashCode, needMerge = true)
public abstract class AvgIndicator extends Indicator {
- @Getter @Setter private long summation;
- @Getter @Setter private int count;
+ protected static final String SUMMATION = "summation";
+ protected static final String COUNT = "count";
+ protected static final String VALUE = "value";
+
+ @Getter @Setter @Column(columnName = SUMMATION) private long summation;
+ @Getter @Setter @Column(columnName = COUNT) private int count;
+ @Getter @Setter @Column(columnName = VALUE) private long value;
@Entrance
public final void combine(@SourceFrom long summation, @ConstOne int count) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index 533379e7d..de932550e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -18,15 +18,27 @@
package org.apache.skywalking.oap.server.core.analysis.indicator;
+import java.util.Map;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
public abstract class Indicator extends StreamData {
- @Getter @Setter private long timeBucket;
+ protected static final String TIME_BUCKET = "time_bucket";
+
+ @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
+
+ public abstract String id();
public abstract void combine(Indicator indicator);
+
+ public abstract String name();
+
+ public abstract Map toMap();
+
+ public abstract Indicator newOne(Map dbMap);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
index 46f03450f..d1ad273e4 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -28,4 +28,6 @@ import org.apache.skywalking.oap.server.core.remote.selector.Selector;
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
Selector selector();
+
+ boolean needMerge();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
index 8513fa2cc..b60e3f779 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/define/IndicatorMapper.java
@@ -79,4 +79,8 @@ public class IndicatorMapper implements Service {
public Class findClassById(int id) {
return idKeyMapping.get(id);
}
+
+ public Collection> indicatorClasses() {
+ return idKeyMapping.values();
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
index 65d68b440..1fadf906d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractAggregatorWorker.java
@@ -21,8 +21,10 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.worker.define.WorkerMapper;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
@@ -33,11 +35,14 @@ public abstract class AbstractAggregatorWorker extends
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregatorWorker.class);
+ private Worker worker;
+ private final ModuleManager moduleManager;
private final DataCarrier dataCarrier;
private final MergeDataCache mergeDataCache;
private int messageNum;
public AbstractAggregatorWorker(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
this.mergeDataCache = new MergeDataCache<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
this.dataCarrier.consume(new AggregatorConsumer(this), 1);
@@ -78,7 +83,15 @@ public abstract class AbstractAggregatorWorker extends
mergeDataCache.finishReadingLast();
}
- protected abstract void onNext(INPUT data);
+ private void onNext(INPUT data) {
+ if (worker == null) {
+ WorkerMapper workerMapper = moduleManager.find(CoreModule.NAME).getService(WorkerMapper.class);
+ worker = workerMapper.findInstanceByClass(nextWorkerClass());
+ }
+ worker.in(data);
+ }
+
+ public abstract Class nextWorkerClass();
private void aggregate(INPUT message) {
mergeDataCache.writing();
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
index bcead6792..59f15022a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/AbstractPersistentWorker.java
@@ -18,18 +18,119 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
public abstract class AbstractPersistentWorker extends Worker {
+ private static final Logger logger = LoggerFactory.getLogger(AbstractPersistentWorker.class);
+
+ private final MergeDataCache mergeDataCache;
+ private final IBatchDAO batchDAO;
+ private final IPersistenceDAO, ?, INPUT> persistenceDAO;
+ private final int blockBatchPersistenceSize = 1000;
+
public AbstractPersistentWorker(ModuleManager moduleManager) {
+ this.mergeDataCache = new MergeDataCache<>();
+ this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
+ this.persistenceDAO = moduleManager.find(StorageModule.NAME).getService(IPersistenceDAO.class);
+ }
+
+ public final Window> getCache() {
+ return mergeDataCache;
}
@Override public final void in(INPUT input) {
+ if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
+ try {
+ if (getCache().trySwitchPointer()) {
+ getCache().switchPointer();
+
+ List> collection = buildBatchCollection();
+ batchDAO.batchPersistence(collection);
+ }
+ } finally {
+ getCache().trySwitchPointerFinally();
+ }
+ }
+ cacheData(input);
+ }
+
+ public final List> buildBatchCollection() {
+ List> batchCollection = new LinkedList<>();
+ try {
+ while (getCache().getLast().isWriting()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ logger.warn("thread wake up");
+ }
+ }
+ if (getCache().getLast().collection() != null) {
+ batchCollection = prepareBatch(getCache().getLast());
+ }
+ } finally {
+ getCache().finishReadingLast();
+ }
+ return batchCollection;
}
+
+ private List
org.elasticsearch.client
- elasticsearch-rest-client
+ elasticsearch-rest-high-level-client
\ No newline at end of file
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
new file mode 100644
index 000000000..520e3f005
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClient.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.*;
+import org.elasticsearch.action.admin.indices.create.*;
+import org.elasticsearch.action.admin.indices.delete.*;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.bulk.*;
+import org.elasticsearch.action.get.*;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.*;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.*;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClient implements Client {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
+
+ private static final String TYPE = "type";
+ private final String clusterNodes;
+ private final NameSpace namespace;
+ private RestHighLevelClient client;
+
+ public ElasticSearchClient(String clusterNodes, NameSpace namespace) {
+ this.clusterNodes = clusterNodes;
+ this.namespace = namespace;
+ }
+
+ @Override public void initialize() {
+ List pairsList = parseClusterNodes(clusterNodes);
+
+ client = new RestHighLevelClient(
+ RestClient.builder(pairsList.toArray(new HttpHost[0])));
+ }
+
+ @Override public void shutdown() {
+ try {
+ client.close();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
+ private List parseClusterNodes(String nodes) {
+ List httpHosts = new LinkedList<>();
+ logger.info("elasticsearch cluster nodes: {}", nodes);
+ String[] nodesSplit = nodes.split(",");
+ for (String node : nodesSplit) {
+ String host = node.split(":")[0];
+ String port = node.split(":")[1];
+ httpHosts.add(new HttpHost(host, Integer.valueOf(port)));
+ }
+
+ return httpHosts;
+ }
+
+ public boolean createIndex(String indexName, Settings settings,
+ XContentBuilder mappingBuilder) throws IOException {
+ indexName = formatIndexName(indexName);
+ CreateIndexRequest request = new CreateIndexRequest(indexName);
+ request.settings(settings);
+ request.mapping(TYPE, mappingBuilder);
+ CreateIndexResponse response;
+ response = client.indices().create(request);
+ logger.info("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ return response.isAcknowledged();
+ }
+
+ public boolean deleteIndex(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+ DeleteIndexRequest request = new DeleteIndexRequest(indexName);
+ DeleteIndexResponse response;
+ response = client.indices().delete(request);
+ logger.info("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
+ return response.isAcknowledged();
+ }
+
+ public boolean isExistsIndex(String indexName) throws IOException {
+ indexName = formatIndexName(indexName);
+ GetIndexRequest request = new GetIndexRequest();
+ request.indices(indexName);
+ return client.indices().exists(request);
+ }
+
+ public SearchResponse search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
+ indexName = formatIndexName(indexName);
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.types(TYPE);
+ searchRequest.source(searchSourceBuilder);
+ return client.search(searchRequest);
+ }
+
+ public GetResponse get(String indexName, String id) throws IOException {
+ indexName = formatIndexName(indexName);
+ GetRequest request = new GetRequest(indexName, TYPE, id);
+ return client.get(request);
+ }
+
+ public IndexRequest prepareInsert(String indexName, String id, XContentBuilder source) {
+ indexName = formatIndexName(indexName);
+ return new IndexRequest(indexName, TYPE, id).source(source);
+ }
+
+ public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
+ indexName = formatIndexName(indexName);
+ return new UpdateRequest(indexName, TYPE, id).doc(source);
+ }
+
+ public void delete(String indexName, String timeBucketColumnName, long startTimeBucket,
+ long endTimeBucket) throws IOException {
+ indexName = formatIndexName(indexName);
+ Map params = Collections.singletonMap("pretty", "true");
+ String jsonString = "{" +
+ " \"query\": {" +
+ " \"range\": {" +
+ " \"" + timeBucketColumnName + "\": {" +
+ " \"gte\": " + startTimeBucket + "," +
+ " \"lte\": " + endTimeBucket + "" +
+ " }" +
+ " }" +
+ " }" +
+ "}";
+ HttpEntity entity = new NStringEntity(jsonString, ContentType.APPLICATION_JSON);
+ client.getLowLevelClient().performRequest("POST", "/" + indexName + "/_delete_by_query", params, entity);
+ }
+
+ private String formatIndexName(String indexName) {
+ if (Objects.nonNull(namespace) && StringUtils.isNotEmpty(namespace.getNameSpace())) {
+ return namespace.getNameSpace() + "_" + indexName;
+ }
+ return indexName;
+ }
+
+ public BulkProcessor createBulkProcessor(int bulkActions, int bulkSize, int flushInterval,
+ int concurrentRequests) {
+ BulkProcessor.Listener listener = new BulkProcessor.Listener() {
+ @Override
+ public void beforeBulk(long executionId, BulkRequest request) {
+
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request,
+ BulkResponse response) {
+
+ }
+
+ @Override
+ public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
+ logger.error("{} data bulk failed, reason: {}", request.numberOfActions(), failure);
+ }
+ };
+
+ return BulkProcessor.builder(client::bulkAsync, listener)
+ .setBulkActions(bulkActions)
+ .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
+ .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
+ .setConcurrentRequests(concurrentRequests)
+ .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
+ .build();
+ }
+}
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
new file mode 100644
index 000000000..bb71d07d5
--- /dev/null
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import org.apache.skywalking.oap.server.library.client.ClientException;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClientException extends ClientException {
+
+ public ElasticSearchClientException(String message) {
+ super(message);
+ }
+
+ public ElasticSearchClientException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
new file mode 100644
index 000000000..f696202a3
--- /dev/null
+++ b/oap-server/server-library/library-client/src/test/java/org/apache/skywalking/oap/server/library/client/elasticsearch/ElasticSearchClientTestCase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.library.client.elasticsearch;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.library.client.ClientException;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.junit.Assert;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchClientTestCase {
+
+ public static void main(String[] args) throws IOException, ClientException {
+ Settings settings = Settings.builder()
+ .put("number_of_shards", 2)
+ .put("number_of_replicas", 0)
+ .build();
+
+ XContentBuilder builder = XContentFactory.jsonBuilder();
+ builder.startObject()
+ .startObject("_all")
+ .field("enabled", false)
+ .endObject()
+ .startObject("properties")
+ .startObject("column1")
+ .field("type", "text")
+ .endObject()
+ .endObject();
+ builder.endObject();
+
+ ElasticSearchClient client = new ElasticSearchClient("localhost:9200", null);
+ client.initialize();
+
+ String indexName = "test";
+ client.createIndex(indexName, settings, builder);
+ Assert.assertTrue(client.isExistsIndex(indexName));
+ client.deleteIndex(indexName);
+ Assert.assertFalse(client.isExistsIndex(indexName));
+
+
+ client.shutdown();
+ }
+}
diff --git a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
index 74c908d2b..527821504 100644
--- a/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
+++ b/oap-server/server-library/library-module/src/main/java/org/apache/skywalking/oap/server/library/module/ModuleDefine.java
@@ -18,11 +18,9 @@
package org.apache.skywalking.oap.server.library.module;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.lang.reflect.Field;
import java.util.*;
+import org.slf4j.*;
/**
* A module definition.
@@ -31,7 +29,7 @@ import java.util.*;
*/
public abstract class ModuleDefine {
- private final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
+ private static final Logger logger = LoggerFactory.getLogger(ModuleDefine.class);
private LinkedList loadedProviders = new LinkedList<>();
@@ -128,7 +126,7 @@ public abstract class ModuleDefine {
return loadedProviders;
}
- final ModuleProvider provider() throws DuplicateProviderException {
+ public final ModuleProvider provider() throws DuplicateProviderException {
if (loadedProviders.size() > 1) {
throw new DuplicateProviderException(this.name() + " module exist " + loadedProviders.size() + " providers");
}
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index cabf8a2eb..46c35354c 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -35,6 +35,20 @@ core:
gRPCPort: 11800
storage:
elasticsearch:
+ clusterNodes: localhost:9200
+ indexShardsNumber: 2
+ indexReplicasNumber: 0
+ # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+ bulkActions: 2000 # Execute the bulk every 2000 requests
+ bulkSize: 20 # flush the bulk every 20mb
+ flushInterval: 10 # flush the bulk every 10 seconds whatever the number of requests
+ concurrentRequests: 2 # the number of concurrent requests
+ # Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
+ traceDataTTL: 90 # Unit is minute
+ minuteMetricDataTTL: 90 # Unit is minute
+ hourMetricDataTTL: 36 # Unit is hour
+ dayMetricDataTTL: 45 # Unit is day
+ monthMetricDataTTL: 18 # Unit is month
service-mesh:
default:
query:
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
index e0adc4370..6c93cb69e 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/pom.xml
@@ -36,5 +36,10 @@
server-core
${project.version}
+
+ org.apache.skywalking
+ library-client
+ ${project.version}
+
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
index 92768ad20..09b0bb250 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchConfig.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
+import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
@@ -25,6 +26,8 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig;
*/
public class StorageModuleElasticsearchConfig extends ModuleConfig {
+ @Setter @Getter private String nameSpace;
+ @Setter @Getter private String clusterNodes;
private int indexShardsNumber;
private int indexReplicasNumber;
private boolean highPerformanceMode;
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 23367a532..b4aa7e8eb 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -18,8 +18,11 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.*;
import org.slf4j.*;
/**
@@ -29,11 +32,14 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleElasticsearchProvider.class);
- private final StorageModuleElasticsearchConfig storageConfig;
+ private final StorageModuleElasticsearchConfig config;
+ private final NameSpace nameSpace;
+ private ElasticSearchClient elasticSearchClient;
public StorageModuleElasticsearchProvider() {
super();
- this.storageConfig = new StorageModuleElasticsearchConfig();
+ this.config = new StorageModuleElasticsearchConfig();
+ this.nameSpace = new NameSpace();
}
@Override
@@ -42,21 +48,34 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
}
@Override
- public Class module() {
+ public Class extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
- return storageConfig;
+ return config;
}
@Override
public void prepare() throws ServiceNotProvidedException {
+ elasticSearchClient = new ElasticSearchClient(config.getClusterNodes(), nameSpace);
+
+ this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
+ this.registerServiceImplementation(IPersistenceDAO.class, new PersistenceEsDAO(elasticSearchClient, nameSpace));
}
@Override
public void start() throws ModuleStartException {
+ try {
+ nameSpace.setNameSpace(config.getNameSpace());
+ elasticSearchClient.initialize();
+
+ StorageEsInstaller installer = new StorageEsInstaller(getManager(), config.getIndexShardsNumber(), config.getIndexReplicasNumber());
+ installer.install(elasticSearchClient);
+ } catch (StorageException e) {
+ throw new ModuleStartException(e.getMessage(), e);
+ }
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
new file mode 100644
index 000000000..0e9ff2fe7
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/BatchProcessEsDAO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.util.List;
+import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class BatchProcessEsDAO extends EsDAO implements IBatchDAO {
+
+ private static final Logger logger = LoggerFactory.getLogger(BatchProcessEsDAO.class);
+
+ private BulkProcessor bulkProcessor;
+ private final int bulkActions;
+ private final int bulkSize;
+ private final int flushInterval;
+ private final int concurrentRequests;
+
+ public BatchProcessEsDAO(ElasticSearchClient client, int bulkActions, int bulkSize, int flushInterval,
+ int concurrentRequests) {
+ super(client);
+ this.bulkActions = bulkActions;
+ this.bulkSize = bulkSize;
+ this.flushInterval = flushInterval;
+ this.concurrentRequests = concurrentRequests;
+ }
+
+ @Override public void batchPersistence(List> batchCollection) {
+ if (bulkProcessor == null) {
+ this.bulkProcessor = getClient().createBulkProcessor(bulkActions, bulkSize, flushInterval, concurrentRequests);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("bulk data size: {}", batchCollection.size());
+ }
+
+ if (CollectionUtils.isNotEmpty(batchCollection)) {
+ batchCollection.forEach(builder -> {
+ if (builder instanceof IndexRequest) {
+ this.bulkProcessor.add((IndexRequest)builder);
+ }
+ if (builder instanceof UpdateRequest) {
+ this.bulkProcessor.add((UpdateRequest)builder);
+ }
+ });
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
new file mode 100644
index 000000000..8e268c1ce
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.apache.skywalking.oap.server.core.storage.define.ColumnTypeMapping;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ColumnTypeEsMapping implements ColumnTypeMapping {
+
+ @Override public String transform(Class> type) {
+ if (Integer.class.equals(type) || int.class.equals(type)) {
+ return "integer";
+ } else if (Long.class.equals(type) || long.class.equals(type)) {
+ return "long";
+ } else if (Double.class.equals(type) || double.class.equals(type)) {
+ return "double";
+ } else if (String.class.equals(type)) {
+ return "text";
+ } else {
+ throw new IllegalArgumentException("Unsupported data type: " + type.getName());
+ }
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
new file mode 100644
index 000000000..dd0a70bd8
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/EsDAO.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.metrics.max.Max;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public abstract class EsDAO extends AbstractDAO {
+
+ private static final Logger logger = LoggerFactory.getLogger(EsDAO.class);
+
+ public EsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ protected final int getMaxId(String indexName, String columnName) {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.aggregation(AggregationBuilders.max("agg").field(columnName));
+ searchSourceBuilder.size(0);
+ return getResponse(indexName, searchSourceBuilder);
+ }
+
+ protected final int getMinId(String indexName, String columnName) {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.aggregation(AggregationBuilders.min("agg").field(columnName));
+ searchSourceBuilder.size(0);
+ return getResponse(indexName, searchSourceBuilder);
+ }
+
+ private int getResponse(String indexName, SearchSourceBuilder searchSourceBuilder) {
+ try {
+ SearchResponse searchResponse = getClient().search(indexName, searchSourceBuilder);
+ Max agg = searchResponse.getAggregations().get("agg");
+
+ int id = (int)agg.getValue();
+ if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
+ return 0;
+ } else {
+ return id;
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ return 0;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
new file mode 100644
index 000000000..1f83b633a
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/PersistenceEsDAO.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.storage.IPersistenceDAO;
+import org.apache.skywalking.oap.server.library.client.NameSpace;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.common.xcontent.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class PersistenceEsDAO implements IPersistenceDAO {
+
+ private final ElasticSearchClient client;
+ private final NameSpace nameSpace;
+
+ public PersistenceEsDAO(ElasticSearchClient client, NameSpace nameSpace) {
+ this.client = client;
+ this.nameSpace = nameSpace;
+ }
+
+ @Override public Indicator get(Indicator input) throws IOException {
+ GetResponse response = client.get(nameSpace.getNameSpace() + "_" + input.name(), input.id());
+ if (response.isExists()) {
+ return input.newOne(response.getSource());
+ } else {
+ return null;
+ }
+ }
+
+ @Override public IndexRequest prepareBatchInsert(Indicator input) throws IOException {
+ Map objectMap = input.toMap();
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+ for (String key : objectMap.keySet()) {
+ builder.field(key, objectMap.get(key));
+ }
+ builder.endObject();
+ return client.prepareInsert(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
+ }
+
+ @Override public UpdateRequest prepareBatchUpdate(Indicator input) throws IOException {
+ Map objectMap = input.toMap();
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+ for (String key : objectMap.keySet()) {
+ builder.field(key, objectMap.get(key));
+ }
+ builder.endObject();
+ return client.prepareUpdate(nameSpace.getNameSpace() + "_" + input.name(), input.id(), builder);
+ }
+
+ @Override public void deleteHistory(Long timeBucketBefore) {
+
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
new file mode 100644
index 000000000..07868d808
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.define.*;
+import org.apache.skywalking.oap.server.library.client.Client;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class StorageEsInstaller extends StorageInstaller {
+
+ private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class);
+
+ private final int indexShardsNumber;
+ private final int indexReplicasNumber;
+ private final ColumnTypeEsMapping mapping;
+
+ public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber) {
+ super(moduleManager);
+ this.indexShardsNumber = indexShardsNumber;
+ this.indexReplicasNumber = indexReplicasNumber;
+ this.mapping = new ColumnTypeEsMapping();
+ }
+
+ @Override protected boolean isExists(Client client, TableDefine tableDefine) throws StorageException {
+ ElasticSearchClient esClient = (ElasticSearchClient)client;
+ try {
+ return esClient.isExistsIndex(tableDefine.getName());
+ } catch (IOException e) {
+ throw new StorageException(e.getMessage());
+ }
+ }
+
+ @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
+
+ }
+
+ @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
+ ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+ try {
+ if (!esClient.deleteIndex(tableDefine.getName())) {
+ throw new StorageException(tableDefine.getName() + " index delete failure.");
+ }
+ } catch (IOException e) {
+ throw new StorageException(tableDefine.getName() + " index delete failure.");
+ }
+ }
+
+ @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
+ ElasticSearchClient esClient = (ElasticSearchClient)client;
+
+ // mapping
+ XContentBuilder mappingBuilder = null;
+
+ Settings settings = createSettingBuilder();
+ try {
+ mappingBuilder = createMappingBuilder(tableDefine);
+ logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
+ } catch (Exception e) {
+ logger.error("create {} index mapping builder error", tableDefine.getName());
+ }
+
+ boolean isAcknowledged;
+ try {
+ isAcknowledged = esClient.createIndex(tableDefine.getName(), settings, mappingBuilder);
+ } catch (IOException e) {
+ throw new StorageException(e.getMessage());
+ }
+ logger.info("create {} index finished, isAcknowledged: {}", tableDefine.getName(), isAcknowledged);
+
+ if (!isAcknowledged) {
+ throw new StorageException("create " + tableDefine.getName() + " index failure, ");
+ }
+ }
+
+ private Settings createSettingBuilder() {
+ return Settings.builder()
+ .put("index.number_of_shards", indexShardsNumber)
+ .put("index.number_of_replicas", indexReplicasNumber)
+ .put("index.refresh_interval", "3s")
+ .put("analysis.analyzer.collector_analyzer.type", "stop")
+ .build();
+ }
+
+ private XContentBuilder createMappingBuilder(TableDefine tableDefine) throws IOException {
+ XContentBuilder mappingBuilder = XContentFactory.jsonBuilder()
+ .startObject()
+ .startObject("_all")
+ .field("enabled", false)
+ .endObject()
+ .startObject("properties");
+
+ for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
+ mappingBuilder
+ .startObject(columnDefine.getColumnName().getName())
+ .field("type", mapping.transform(columnDefine.getType()))
+ .endObject();
+ }
+
+ mappingBuilder
+ .endObject()
+ .endObject();
+
+ logger.debug("create elasticsearch index: {}", mappingBuilder.prettyPrint());
+
+ return mappingBuilder;
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
new file mode 100644
index 000000000..2553ab7b9
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/test/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ElasticSearchColumnTypeMappingTestCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import org.junit.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class ElasticSearchColumnTypeMappingTestCase {
+
+ @Test
+ public void test() {
+ ColumnTypeEsMapping mapping = new ColumnTypeEsMapping();
+
+ Assert.assertEquals("integer", mapping.transform(int.class));
+ Assert.assertEquals("integer", mapping.transform(Integer.class));
+
+ Assert.assertEquals("long", mapping.transform(long.class));
+ Assert.assertEquals("long", mapping.transform(Long.class));
+
+ Assert.assertEquals("double", mapping.transform(double.class));
+ Assert.assertEquals("double", mapping.transform(Double.class));
+
+ Assert.assertEquals("text", mapping.transform(String.class));
+ }
+}
--
GitLab