提交 9b408e82 编写于 作者: K kezhenxu94 提交者: wu-sheng

[Feature] Support ElasticSearch 7 as backend storage (#3870)

* [Feature] Officially support ElasticSearch 7 as backend storage

* Extract bootstrap module to be shared by multiple starters

* Add missing configuration file and distinguish different version in test

* Update known-oap-backend-dependencies-es7.txt

Update the dependency lib

* Fix missing adaptation to ElasticSearch 7

* Rename ElasticSearch 7 specific class with Es7 infix

* Fix miss-adapted ElasticSearch DAO API

* Add ES7 specific configuration and polish documentations

* Polish documentations

* Polish documentations
上级 6c4cca82
......@@ -72,7 +72,6 @@ pipeline {
}
}
steps {
sh 'tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist'
sh 'tools/dependencies/check-LICENSE.sh'
}
}
......
......@@ -113,6 +113,12 @@ pipeline {
sh 'E2E_VERSION=jdk8-1.3 bash -x test/e2e/run.sh e2e-agent-reboot'
}
}
stage('Cluster Tests (ES7/ZK/JDK8)') {
steps {
sh 'E2E_VERSION=jdk8-1.3 DIST_PACKAGE=apache-skywalking-apm-bin-es7.tar.gz ES_VERSION=7.0.0 bash -x test/e2e/run.sh e2e-cluster/test-runner'
}
}
}
}
......@@ -129,6 +135,12 @@ pipeline {
sh 'E2E_VERSION=jdk8-1.3 bash -x test/e2e/run.sh e2e-ttl/e2e-ttl-es'
}
}
stage('TTL ES7 Tests(JDK8)') {
steps {
sh 'E2E_VERSION=jdk8-1.3 DIST_PACKAGE=apache-skywalking-apm-bin-es7.tar.gz ES_VERSION=7.0.0 bash -x test/e2e/run.sh e2e-ttl/e2e-ttl-es'
}
}
}
}
}
......
......@@ -71,7 +71,6 @@
</profiles>
<build>
<finalName>apache-skywalking-apm-bin</finalName>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
......@@ -82,15 +81,31 @@
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>apache-skywalking-apm-bin</finalName>
<descriptors>
<descriptor>${project.basedir}/src/main/assembly/binary.xml</descriptor>
</descriptors>
</configuration>
</execution>
<execution>
<id>dist-es7</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<finalName>apache-skywalking-apm-bin-es7</finalName>
<descriptors>
<descriptor>${project.basedir}/src/main/assembly/binary-es7.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
<configuration>
<attach>true</attach>
<tarLongFileMode>posix</tarLongFileMode>
<runOnlyAtExecutionRoot>false</runOnlyAtExecutionRoot>
<descriptors>
<descriptor>${project.basedir}/src/main/assembly/binary.xml</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</plugin>
......@@ -98,6 +113,7 @@
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>dist</id>
<phase>package</phase>
<goals>
<goal>run</goal>
......@@ -109,6 +125,19 @@
</tasks>
</configuration>
</execution>
<execution>
<id>dist-es7</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<tasks>
<copy file="${project.build.directory}/apache-skywalking-apm-bin-es7.tar.gz" tofile="${project.basedir}/../dist/apache-skywalking-apm-bin-es7.tar.gz" overwrite="true" />
<copy file="${project.build.directory}/apache-skywalking-apm-bin-es7.zip" tofile="${project.basedir}/../dist/apache-skywalking-apm-bin-es7.zip" overwrite="true" />
</tasks>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
......
......@@ -238,6 +238,7 @@ The text of each license is the standard Apache 2.0 license.
Google: proto-google-common-protos 0.1.9: https://github.com/googleapis/googleapis , Apache 2.0
Google: jsr305 3.0.0: http://central.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.pom , Apache 2.0
Elasticsearch BV (Elasticsearch) 6.3.2: https://www.elastic.co/products/elasticsearch , Apache 2.0
Elasticsearch BV (Elasticsearch) 7.0.0: https://www.elastic.co/products/elasticsearch , Apache 2.0
lang-mustache-client 5.5.0: https://github.com/elastic/elasticsearch/tree/master/modules/lang-mustache , Apache 2.0
parent-join-client 5.5.0: https://github.com/elastic/elasticsearch/tree/master/modules/parent-join , Apache 2.0
reindex-client 5.5.0: https://github.com/elastic/elasticsearch/tree/master/modules/reindex , Apache 2.0
......
......@@ -95,6 +95,28 @@ storage:
# resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
# segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
# elasticsearch7:
# nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
# recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
# otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
# monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
# segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
# indexMaxResultWindow: ${SW_STORAGE_ES_INDEX_MAX_RESULT_WINDOW:5000}
h2:
driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
......
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>dist</id>
<formats>
<format>zip</format>
<format>tar.gz</format>
</formats>
<fileSets>
<fileSet>
<directory>${project.basedir}/bin</directory>
<outputDirectory>/bin</outputDirectory>
<includes>
<include>*.sh</include>
<include>*.bat</include>
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>src/main/assembly</directory>
<outputDirectory>/config</outputDirectory>
<includes>
<include>log4j2.xml</include>
<include>application.yml</include>
<include>alarm-settings.yml</include>
<include>alarm-settings-sample.yml</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../oap-server/server-bootstrap/src/main/resources</directory>
<includes>
<include>component-libraries.yml</include>
<include>official_analysis.oal</include>
<include>gateways.yml</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.basedir}/../oap-server/server-starter-es7/target/skywalking-oap-assembly/skywalking-oap/config</directory>
<outputDirectory>/config</outputDirectory>
<includes>
<include>*.yml</include>
<include>*.xml</include>
<include>*.properties</include>
<include>*.oal</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../oap-server/server-starter-es7/target/skywalking-oap-assembly/skywalking-oap/libs</directory>
<outputDirectory>/oap-libs</outputDirectory>
</fileSet>
<!-- Agent repackage into the dist -->
<fileSet>
<directory>${project.basedir}/../skywalking-agent</directory>
<outputDirectory>/agent</outputDirectory>
</fileSet>
<!-- Release docs and licenses -->
<fileSet>
<directory>${project.basedir}/../</directory>
<outputDirectory>/</outputDirectory>
<includes>
<include>DISCLAIMER</include>
<include>CHANGES</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/release-docs</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.basedir}/../apm-webapp/target/skywalking-webapp.jar</source>
<outputDirectory>/webapp</outputDirectory>
<fileMode>0644</fileMode>
</file>
<file>
<source>${project.basedir}/../apm-webapp/src/main/assembly/webapp.yml</source>
<outputDirectory>/webapp</outputDirectory>
<fileMode>0644</fileMode>
</file>
</files>
</assembly>
......@@ -35,6 +35,25 @@
</includes>
<fileMode>0755</fileMode>
</fileSet>
<fileSet>
<directory>src/main/assembly</directory>
<outputDirectory>/config</outputDirectory>
<includes>
<include>log4j2.xml</include>
<include>application.yml</include>
<include>alarm-settings.yml</include>
<include>alarm-settings-sample.yml</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/../oap-server/server-bootstrap/src/main/resources</directory>
<includes>
<include>component-libraries.yml</include>
<include>official_analysis.oal</include>
<include>gateways.yml</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.basedir}/../oap-server/server-starter/target/skywalking-oap-assembly/skywalking-oap/config</directory>
<outputDirectory>/config</outputDirectory>
......
......@@ -4,7 +4,7 @@ use is by changing the `application.yml`
Native supported storage
- H2
- ElasticSearch 6
- ElasticSearch 6, 7
- MySQL
- TiDB
......@@ -26,10 +26,16 @@ storage:
user: sa
```
## ElasticSearch 6
Active ElasticSearch 6 as storage, set storage provider to **elasticsearch**.
## ElasticSearch
- In order to activate ElasticSearch 6 as storage, set storage provider to **elasticsearch**
- In order to activate ElasticSearch 7 as storage, set storage provider to **elasticsearch7**
**Required ElasticSearch 6.3.2 or higher, excepted 7.0.0 or higher. HTTP RestHighLevelClient is used to connect server.**
**Required ElasticSearch 6.3.2 or higher. HTTP RestHighLevelClient is used to connect server.**
- For ElasticSearch 6.3.2 ~ 7.0.0 (excluded), please download the `apache-skywalking-bin.tar.gz` or `apache-skywalking-bin.zip`,
- For ElasticSearch 7.0.0 ~ 8.0.0 (excluded), please download the `apache-skywalking-bin-es7.tar.gz` or `apache-skywalking-bin-es7.zip`.
ElasticSearch 6 and ElasticSearch 7 share most of the configurations, as follows:
Setting fragment example
......@@ -54,6 +60,22 @@ storage:
bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
```
and there're also some configurations that are ES7 specific, as follows:
```yaml
storage:
elasticsearch7:
# ... the configurations shared with ES6 that are listed above ...
# Index max result window, for segment deep pagination, usually we don't recommend to scroll too many pages,
# instead, give more query criteria (e.g. service id or time range), to narrow the query results.
# see https://www.elastic.co/guide/en/elasticsearch/guide/current/pagination.html for more information
indexMaxResultWindow: ${SW_STORAGE_ES_INDEX_MAX_RESULT_WINDOW:5000}
```
### ElasticSearch 6 With Https SSL Encrypting communications.
......@@ -87,9 +109,19 @@ storage:
### Data TTL
TTL in ElasticSearch overrides the settings of core, read [ElasticSearch section in TTL document](ttl.md#elasticsearch-6-storage-ttl)
### ElasticSearch server settings
Read the [ElasticSearch storage FAQ](../../FAQ/ES-Server-FAQ.md) if you are new to ElasticSearch.
And recommend read more about these configuration from ElasticSearch official document.
### Recommended ElasticSearch server-side configurations
You could add following config to `elasticsearch.yml`, set the value based on your env.
```yml
# In tracing scenario, consider to set more than this at least.
thread_pool.index.queue_size: 1000 # Only suitable for ElasticSearch 6
thread_pool.write.queue_size: 1000 # Suitable for ElasticSearch 6 and 7
# When you face query error at trace page, remember to check this.
index.max_result_window: 1000000 # Only suitable for ElasticSearch 6. For ES 7, set `indexMaxResultWindow` under `storage`-`elasticsearch7` section in application.yml
```
We strongly advice you to read more about these configurations from ElasticSearch official document.
This effects the performance of ElasticSearch very much.
......@@ -194,7 +226,7 @@ These settings can refer to the configuration of *MySQL* above.
## ElasticSearch 5
ElasticSearch 5 is incompatible with ElasticSearch 6 Java client jar, so it could not be included in native distribution.
[OpenSkywalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version.
[OpenSkyWalking/SkyWalking-With-Es5x-Storage](https://github.com/OpenSkywalking/SkyWalking-With-Es5x-Storage) repo includes the distribution version.
## More storage solution extension
Follow [Storage extension development guide](../../guides/storage-extention.md)
......
......@@ -34,6 +34,7 @@
<module>server-storage-plugin</module>
<module>server-library</module>
<module>server-starter</module>
<module>server-starter-es7</module>
<module>server-query-plugin</module>
<module>server-alarm-plugin</module>
<module>server-testing</module>
......@@ -42,6 +43,7 @@
<module>oal-grammar</module>
<module>exporter</module>
<module>server-configuration</module>
<module>server-bootstrap</module>
</modules>
<properties>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server-bootstrap</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- OAL runtime core -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>oal-rt</artifactId>
<version>${project.version}</version>
</dependency>
<!-- OAL runtime core -->
<!-- cluster module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-standalone-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-zookeeper-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-kubernetes-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-consul-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-nacos-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-etcd-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- cluster module -->
<!-- receiver module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-mesh-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-istio-telemetry-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-jvm-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>envoy-metrics-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-clr-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-so11y-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- storage module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- storage module -->
<!-- queryBuild module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>query-graphql-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- queryBuild module -->
<!-- alarm module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-alarm-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- telemetry -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-prometheus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-so11y</artifactId>
<version>${project.version}</version>
</dependency>
<!-- exporter -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>exporter</artifactId>
<version>${project.version}</version>
</dependency>
<!-- configuration -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>grpc-configuration-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-apollo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-nacos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-consul</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${compiler.version}</source>
<target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<excludes>
<exclude>application.yml</exclude>
<exclude>log4j2.xml</exclude>
<exclude>alarm-settings.yml</exclude>
<exclude>component-libraries.yml</exclude>
<exclude>endpoint_naming_rules.properties</exclude>
<exclude>official_analysis.oal</exclude>
<exclude>gateways.yml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.starter;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class OAPServerBootstrap {
private static final Logger logger = LoggerFactory.getLogger(OAPServerBootstrap.class);
public static void start() {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class).createGauge("uptime",
"oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
logger.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
System.exit(1);
}
}
}
......@@ -73,7 +73,28 @@ core:
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
topNReportPeriod: ${SW_CORE_TOPN_REPORT_PERIOD:10} # top_n record worker report cycle, unit is minute
storage:
elasticsearch:
# elasticsearch:
# nameSpace: ${SW_NAMESPACE:""}
# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
# protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
# #trustStorePath: ${SW_SW_STORAGE_ES_SSL_JKS_PATH:"../es_keystore.jks"}
# #trustStorePass: ${SW_SW_STORAGE_ES_SSL_JKS_PASS:""}
# user: ${SW_ES_USER:""}
# password: ${SW_ES_PASSWORD:""}
# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
# # Those data TTL settings will override the same settings in core module.
# recordDataTTL: ${SW_STORAGE_ES_RECORD_DATA_TTL:7} # Unit is day
# otherMetricsDataTTL: ${SW_STORAGE_ES_OTHER_METRIC_DATA_TTL:45} # Unit is day
# monthMetricsDataTTL: ${SW_STORAGE_ES_MONTH_METRIC_DATA_TTL:18} # Unit is month
# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:1000} # Execute the bulk every 1000 requests
# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
# metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
# segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
elasticsearch7:
nameSpace: ${SW_NAMESPACE:""}
clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
protocol: ${SW_STORAGE_ES_HTTP_PROTOCOL:"http"}
......@@ -94,6 +115,10 @@ storage:
resultWindowMaxSize: ${SW_STORAGE_ES_QUERY_MAX_WINDOW_SIZE:10000}
metadataQueryMaxSize: ${SW_STORAGE_ES_QUERY_MAX_SIZE:5000}
segmentQueryMaxSize: ${SW_STORAGE_ES_QUERY_SEGMENT_SIZE:200}
# Index max result window, for segment deep pagination, usually we don't recommend to scroll too many pages,
# instead, give more query criteria (e.g. service id or time range), to narrow the query results.
# see https://www.elastic.co/guide/en/elasticsearch/guide/current/pagination.html for more information
indexMaxResultWindow: ${SW_STORAGE_ES_INDEX_MAX_RESULT_WINDOW:5000}
# h2:
# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
......
......@@ -20,7 +20,6 @@ package org.apache.skywalking.oap.server.library.client.elasticsearch;
import com.google.common.base.Splitter;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
......@@ -34,6 +33,7 @@ import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
......@@ -56,6 +56,8 @@ import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
......@@ -72,7 +74,6 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
......@@ -93,8 +94,8 @@ public class ElasticSearchClient implements Client {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
public static final String TYPE = "type";
private final String clusterNodes;
private final String protocol;
protected final String clusterNodes;
protected final String protocol;
private final String trustStorePath;
private final String trustStorePass;
private final String namespace;
......@@ -115,7 +116,12 @@ public class ElasticSearchClient implements Client {
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
List<HttpHost> pairsList = parseClusterNodes(clusterNodes);
List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
client = createClient(hosts);
client.ping();
}
protected RestHighLevelClient createClient(final List<HttpHost> pairsList) throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException, KeyManagementException {
RestClientBuilder builder;
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
......@@ -139,15 +145,14 @@ public class ElasticSearchClient implements Client {
builder = RestClient.builder(pairsList.toArray(new HttpHost[0]));
}
client = new RestHighLevelClient(builder);
client.ping();
return new RestHighLevelClient(builder);
}
@Override public void shutdown() throws IOException {
client.close();
}
private List<HttpHost> parseClusterNodes(String nodes) {
public static List<HttpHost> parseClusterNodes(String protocol, String nodes) {
List<HttpHost> httpHosts = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
List<String> nodesSplit = Splitter.on(",").omitEmptyStrings().splitToList(nodes);
......@@ -170,11 +175,12 @@ public class ElasticSearchClient implements Client {
return response.isAcknowledged();
}
public boolean createIndex(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
public boolean createIndex(String indexName, Map<String, Object> settings, Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings.toString(), XContentType.JSON);
request.mapping(TYPE, mapping.toString(), XContentType.JSON);
Gson gson = new Gson();
request.settings(gson.toJson(settings), XContentType.JSON);
request.mapping(TYPE, gson.toJson(mapping), XContentType.JSON);
CreateIndexResponse response = client.indices().create(request);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
......@@ -214,7 +220,7 @@ public class ElasticSearchClient implements Client {
return deleteIndex(modelName, true);
}
private boolean deleteIndex(String indexName, boolean formatIndexName) throws IOException {
protected boolean deleteIndex(String indexName, boolean formatIndexName) throws IOException {
if (formatIndexName) {
indexName = formatIndexName(indexName);
}
......@@ -247,22 +253,21 @@ public class ElasticSearchClient implements Client {
}
}
public boolean createTemplate(String indexName, JsonObject settings, JsonObject mapping) throws IOException {
public boolean createTemplate(String indexName, Map<String, Object> settings, Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
JsonArray patterns = new JsonArray();
patterns.add(indexName + "-*");
String[] patterns = new String[] {indexName + "-*"};
JsonObject aliases = new JsonObject();
aliases.add(indexName, new JsonObject());
Map<String, Object> aliases = new HashMap<>();
aliases.put(indexName, new JsonObject());
JsonObject template = new JsonObject();
template.add("index_patterns", patterns);
template.add("aliases", aliases);
template.add("settings", settings);
template.add("mappings", mapping);
Map<String, Object> template = new HashMap<>();
template.put("index_patterns", patterns);
template.put("aliases", aliases);
template.put("settings", settings);
template.put("mappings", mapping);
HttpEntity entity = new NStringEntity(template.toString(), ContentType.APPLICATION_JSON);
HttpEntity entity = new NStringEntity(new Gson().toJson(template), ContentType.APPLICATION_JSON);
Response response = client.getLowLevelClient().performRequest(HttpPut.METHOD_NAME, "/_template/" + indexName, Collections.emptyMap(), entity);
return response.getStatusLine().getStatusCode() == HttpStatus.SC_OK;
......@@ -299,30 +304,30 @@ public class ElasticSearchClient implements Client {
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = prepareInsert(indexName, id, source);
IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source, long version) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source);
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
request.version(version);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
UpdateRequest request = prepareUpdate(indexName, id, source);
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request);
}
public ElasticSearchInsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new ElasticSearchInsertRequest(indexName, TYPE, id).source(source);
}
public ElasticSearchUpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new ElasticSearchUpdateRequest(indexName, TYPE, id).doc(source);
}
......@@ -359,27 +364,7 @@ public class ElasticSearchClient implements Client {
}
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
BulkProcessor.Listener listener = createBulkListener();
return BulkProcessor.builder(client::bulkAsync, listener)
.setBulkActions(bulkActions)
......@@ -389,6 +374,30 @@ public class ElasticSearchClient implements Client {
.build();
}
protected BulkProcessor.Listener createBulkListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.info("Bulk execution id [{}] completed in {} milliseconds, size: {}", executionId, response.getTook().getMillis(), request.requests().size());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
}
public String formatIndexName(String indexName) {
if (StringUtils.isNotEmpty(namespace)) {
return namespace + "_" + indexName;
......
......@@ -81,17 +81,14 @@ public class ITElasticSearchClient {
@Test
public void indexOperate() throws IOException {
JsonObject settings = new JsonObject();
settings.addProperty("number_of_shards", 2);
settings.addProperty("number_of_replicas", 2);
Map<String, Object> settings = new HashMap<>();
settings.put("number_of_shards", 2);
settings.put("number_of_replicas", 2);
JsonObject mapping = new JsonObject();
mapping.add("_doc", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("_doc");
Map<String, Object> doc = new HashMap<>();
JsonObject properties = new JsonObject();
doc.add("properties", properties);
doc.put("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
......@@ -148,18 +145,18 @@ public class ITElasticSearchClient {
@Test
public void templateOperate() throws IOException {
JsonObject settings = new JsonObject();
settings.addProperty("number_of_shards", 1);
settings.addProperty("number_of_replicas", 0);
settings.addProperty("index.refresh_interval", "3s");
settings.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
Map<String, Object> settings = new HashMap<>();
settings.put("number_of_shards", 1);
settings.put("number_of_replicas", 0);
settings.put("index.refresh_interval", "3s");
settings.put("analysis.analyzer.oap_analyzer.type", "stop");
JsonObject mapping = new JsonObject();
mapping.add("type", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("type");
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> doc = new HashMap<>();
mapping.put("type", doc);
JsonObject properties = new JsonObject();
doc.add("properties", properties);
doc.put("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
......@@ -209,18 +206,18 @@ public class ITElasticSearchClient {
String indexName = "test_time_series_operate";
String timeSeriesIndexName = indexName + "-2019";
JsonObject mapping = new JsonObject();
mapping.add("type", new JsonObject());
JsonObject doc = mapping.getAsJsonObject("type");
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> doc = new HashMap<>();
mapping.put("type", doc);
JsonObject properties = new JsonObject();
doc.add("properties", properties);
doc.put("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "text");
properties.add("name", column);
client.createTemplate(indexName, new JsonObject(), mapping);
client.createTemplate(indexName, new HashMap<>(), mapping);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("name", "pengys")
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.6.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>server-starter-es7</artifactId>
<description>A backend starter specially for ElasticSearch 7 storage</description>
<packaging>jar</packaging>
<properties>
<elasticsearch.version>7.0.0</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-bootstrap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch7-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptors>
<descriptor>src/main/assembly/assembly.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>assembly</id>
<formats>
<format>dir</format>
</formats>
<dependencySets>
<dependencySet>
<outputDirectory>/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.starter;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class OAPServerStartUp {
public static void main(String[] args) {
OAPServerBootstrap.start();
}
}
......@@ -26,108 +26,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>server-starter</artifactId>
<description>A backend starter specially for ElasticSearch 6 storage</description>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
<!-- OAL runtime core -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>oal-rt</artifactId>
<version>${project.version}</version>
</dependency>
<!-- OAL runtime core -->
<!-- cluster module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-standalone-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-zookeeper-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-kubernetes-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-consul-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-nacos-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>cluster-etcd-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- cluster module -->
<!-- receiver module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-mesh-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-istio-telemetry-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-register-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-jvm-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-trace-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>envoy-metrics-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-clr-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>skywalking-so11y-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- receiver module -->
<!-- storage module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-jdbc-hikaricp-plugin</artifactId>
<artifactId>server-bootstrap</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
......@@ -145,109 +50,16 @@
<artifactId>jaeger-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- storage module -->
<!-- queryBuild module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>query-graphql-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- queryBuild module -->
<!-- alarm module -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-alarm-plugin</artifactId>
<version>${project.version}</version>
</dependency>
<!-- telemetry -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-prometheus</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>telemetry-so11y</artifactId>
<version>${project.version}</version>
</dependency>
<!-- exporter -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>exporter</artifactId>
<version>${project.version}</version>
</dependency>
<!-- configuration -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>grpc-configuration-sync</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-apollo</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-nacos</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-etcd</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>configuration-consul</artifactId>
<artifactId>zipkin-receiver-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>skywalking-oap</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${compiler.version}</source>
<target>${compiler.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<excludes>
<exclude>application.yml</exclude>
<exclude>log4j2.xml</exclude>
<exclude>alarm-settings.yml</exclude>
<exclude>component-libraries.yml</exclude>
<exclude>endpoint_naming_rules.properties</exclude>
<exclude>official_analysis.oal</exclude>
<exclude>gateways.yml</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
......
......@@ -30,25 +30,4 @@
<scope>runtime</scope>
</dependencySet>
</dependencySets>
<fileSets>
<fileSet>
<directory>src/main/assembly</directory>
<outputDirectory>/config</outputDirectory>
<includes>
<include>log4j2.xml</include>
<include>application.yml</include>
<include>alarm-settings.yml</include>
<include>alarm-settings-sample.yml</include>
<include>gateways.yml</include>
</includes>
</fileSet>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>component-libraries.yml</include>
<include>official_analysis.oal</include>
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
</fileSets>
</assembly>
......@@ -18,42 +18,13 @@
package org.apache.skywalking.oap.server.starter;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.starter.config.ApplicationConfigLoader;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class OAPServerStartUp {
private static final Logger logger = LoggerFactory.getLogger(OAPServerStartUp.class);
public static void main(String[] args) {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class).createGauge("uptime",
"oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
logger.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
System.exit(1);
}
OAPServerBootstrap.start();
}
}
......@@ -30,6 +30,7 @@
<modules>
<module>storage-jdbc-hikaricp-plugin</module>
<module>storage-elasticsearch-plugin</module>
<module>storage-elasticsearch7-plugin</module>
<module>storage-zipkin-plugin</module>
<module>storage-jaeger-plugin</module>
</modules>
......
......@@ -24,6 +24,8 @@ import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.*;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -32,9 +34,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
*/
public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
private final StorageBuilder<Metrics> storageBuilder;
protected final StorageBuilder<Metrics> storageBuilder;
MetricsEsDAO(ElasticSearchClient client, StorageBuilder<Metrics> storageBuilder) {
protected MetricsEsDAO(ElasticSearchClient client, StorageBuilder<Metrics> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
......@@ -50,13 +52,13 @@ public class MetricsEsDAO extends EsDAO implements IMetricsDAO {
return result;
}
@Override public ElasticSearchInsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
@Override public InsertRequest prepareBatchInsert(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(metrics));
String modelName = TimeSeriesUtils.timeSeries(model, metrics.getTimeBucket());
return getClient().prepareInsert(modelName, metrics.id(), builder);
}
@Override public ElasticSearchUpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
@Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(metrics));
String modelName = TimeSeriesUtils.timeSeries(model, metrics.getTimeBucket());
return getClient().prepareUpdate(modelName, metrics.id(), builder);
......
......@@ -33,7 +33,7 @@ public class RecordEsDAO extends EsDAO implements IRecordDAO {
private final StorageBuilder<Record> storageBuilder;
RecordEsDAO(ElasticSearchClient client, StorageBuilder<Record> storageBuilder) {
public RecordEsDAO(ElasticSearchClient client, StorageBuilder<Record> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
......
......@@ -32,7 +32,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
private final StorageBuilder<RegisterSource> storageBuilder;
RegisterEsDAO(ElasticSearchClient client, StorageBuilder<RegisterSource> storageBuilder) {
public RegisterEsDAO(ElasticSearchClient client, StorageBuilder<RegisterSource> storageBuilder) {
super(client);
this.storageBuilder = storageBuilder;
}
......
......@@ -18,15 +18,20 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.core.storage.model.ModelColumn;
import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
......@@ -35,10 +40,10 @@ public class StorageEsInstaller extends ModelInstaller {
private static final Logger logger = LoggerFactory.getLogger(StorageEsInstaller.class);
private final int indexShardsNumber;
private final int indexReplicasNumber;
private final int indexRefreshInterval;
private final ColumnTypeEsMapping columnTypeEsMapping;
protected final int indexShardsNumber;
protected final int indexReplicasNumber;
protected final int indexRefreshInterval;
protected final ColumnTypeEsMapping columnTypeEsMapping;
public StorageEsInstaller(ModuleManager moduleManager, int indexShardsNumber, int indexReplicasNumber, int indexRefreshInterval) {
super(moduleManager);
......@@ -64,8 +69,8 @@ public class StorageEsInstaller extends ModelInstaller {
@Override protected void createTable(Client client, Model model) throws StorageException {
ElasticSearchClient esClient = (ElasticSearchClient)client;
JsonObject settings = createSetting(model.isRecord());
JsonObject mapping = createMapping(model);
Map<String, Object> settings = createSetting(model.isRecord());
Map<String, Object> mapping = createMapping(model);
logger.info("index {}'s columnTypeEsMapping builder str: {}", esClient.formatIndexName(model.getName()), mapping.toString());
try {
......@@ -97,46 +102,46 @@ public class StorageEsInstaller extends ModelInstaller {
}
}
private JsonObject createSetting(boolean record) {
JsonObject setting = new JsonObject();
setting.addProperty("index.number_of_shards", indexShardsNumber);
setting.addProperty("index.number_of_replicas", indexReplicasNumber);
setting.addProperty("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.addProperty("analysis.analyzer.oap_analyzer.type", "stop");
protected Map<String, Object> createSetting(boolean record) {
Map<String, Object> setting = new HashMap<>();
setting.put("index.number_of_shards", indexShardsNumber);
setting.put("index.number_of_replicas", indexReplicasNumber);
setting.put("index.refresh_interval", record ? TimeValue.timeValueSeconds(10).toString() : TimeValue.timeValueSeconds(indexRefreshInterval).toString());
setting.put("analysis.analyzer.oap_analyzer.type", "stop");
return setting;
}
private JsonObject createMapping(Model model) {
JsonObject mapping = new JsonObject();
mapping.add(ElasticSearchClient.TYPE, new JsonObject());
protected Map<String, Object> createMapping(Model model) {
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> type = new HashMap<>();
JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
mapping.put(ElasticSearchClient.TYPE, type);
JsonObject properties = new JsonObject();
type.add("properties", properties);
Map<String, Object> properties = new HashMap<>();
type.put("properties", properties);
for (ModelColumn columnDefine : model.getColumns()) {
if (columnDefine.isMatchQuery()) {
String matchCName = MatchCNameBuilder.INSTANCE.build(columnDefine.getColumnName().getName());
JsonObject originalColumn = new JsonObject();
originalColumn.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
originalColumn.addProperty("copy_to", matchCName);
properties.add(columnDefine.getColumnName().getName(), originalColumn);
Map<String, Object> originalColumn = new HashMap<>();
originalColumn.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
originalColumn.put("copy_to", matchCName);
properties.put(columnDefine.getColumnName().getName(), originalColumn);
JsonObject matchColumn = new JsonObject();
matchColumn.addProperty("type", "text");
matchColumn.addProperty("analyzer", "oap_analyzer");
properties.add(matchCName, matchColumn);
Map<String, Object> matchColumn = new HashMap<>();
matchColumn.put("type", "text");
matchColumn.put("analyzer", "oap_analyzer");
properties.put(matchCName, matchColumn);
} else if (columnDefine.isContent()) {
JsonObject column = new JsonObject();
column.addProperty("type", "text");
column.addProperty("index", false);
properties.add(columnDefine.getColumnName().getName(), column);
Map<String, Object> column = new HashMap<>();
column.put("type", "text");
column.put("index", false);
properties.put(columnDefine.getColumnName().getName(), column);
} else {
JsonObject column = new JsonObject();
column.addProperty("type", columnTypeEsMapping.transform(columnDefine.getType()));
properties.add(columnDefine.getColumnName().getName(), column);
Map<String, Object> column = new HashMap<>();
column.put("type", columnTypeEsMapping.transform(columnDefine.getType()));
properties.put(columnDefine.getColumnName().getName(), column);
}
}
......
......@@ -26,7 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.model.Model;
*/
public class TimeSeriesUtils {
static String timeSeries(Model model) {
public static String timeSeries(Model model) {
long timeBucket = TimeBucket.getTimeBucket(System.currentTimeMillis(), model.getDownsampling());
return timeSeries(model, timeBucket);
}
......
......@@ -37,7 +37,7 @@ public class EndpointInventoryCacheEsDAO extends EsDAO implements IEndpointInven
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCacheEsDAO.class);
private final EndpointInventory.Builder builder = new EndpointInventory.Builder();
protected final EndpointInventory.Builder builder = new EndpointInventory.Builder();
public EndpointInventoryCacheEsDAO(ElasticSearchClient client) {
super(client);
......
......@@ -38,8 +38,8 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO implements INetwork
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryCacheEsDAO.class);
private final NetworkAddressInventory.Builder builder = new NetworkAddressInventory.Builder();
private final int resultWindowMaxSize;
protected final NetworkAddressInventory.Builder builder = new NetworkAddressInventory.Builder();
protected final int resultWindowMaxSize;
public NetworkAddressInventoryCacheEsDAO(ElasticSearchClient client, int resultWindowMaxSize) {
super(client);
......
......@@ -37,7 +37,7 @@ public class ServiceInstanceInventoryCacheDAO extends EsDAO implements IServiceI
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryCacheDAO.class);
private final ServiceInstanceInventory.Builder builder = new ServiceInstanceInventory.Builder();
protected final ServiceInstanceInventory.Builder builder = new ServiceInstanceInventory.Builder();
public ServiceInstanceInventoryCacheDAO(ElasticSearchClient client) {
super(client);
......
......@@ -39,8 +39,8 @@ public class ServiceInventoryCacheEsDAO extends EsDAO implements IServiceInvento
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCacheEsDAO.class);
private final ServiceInventory.Builder builder = new ServiceInventory.Builder();
private final int resultWindowMaxSize;
protected final ServiceInventory.Builder builder = new ServiceInventory.Builder();
protected final int resultWindowMaxSize;
public ServiceInventoryCacheEsDAO(ElasticSearchClient client, int resultWindowMaxSize) {
super(client);
......
......@@ -18,15 +18,19 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import com.google.gson.JsonObject;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.register.worker.InventoryStreamProcessor;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
......@@ -35,7 +39,7 @@ public class RegisterLockInstaller {
private static final Logger logger = LoggerFactory.getLogger(RegisterLockInstaller.class);
private final ElasticSearchClient client;
protected final ElasticSearchClient client;
public RegisterLockInstaller(ElasticSearchClient client) {
this.client = client;
......@@ -67,23 +71,23 @@ public class RegisterLockInstaller {
client.deleteByModelName(RegisterLockIndex.NAME);
}
private void createIndex() throws IOException {
JsonObject settings = new JsonObject();
settings.addProperty("index.number_of_shards", 1);
settings.addProperty("index.number_of_replicas", 0);
settings.addProperty("index.refresh_interval", "1s");
protected void createIndex() throws IOException {
Map<String, Object> settings = new HashMap<>();
settings.put("index.number_of_shards", 1);
settings.put("index.number_of_replicas", 0);
settings.put("index.refresh_interval", "1s");
JsonObject mapping = new JsonObject();
mapping.add(ElasticSearchClient.TYPE, new JsonObject());
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> type = new HashMap<>();
JsonObject type = mapping.get(ElasticSearchClient.TYPE).getAsJsonObject();
mapping.put(ElasticSearchClient.TYPE, type);
JsonObject properties = new JsonObject();
type.add("properties", properties);
Map<String, Object> properties = new HashMap<>();
type.put("properties", properties);
JsonObject column = new JsonObject();
column.addProperty("type", "integer");
properties.add(RegisterLockIndex.COLUMN_SEQUENCE, column);
Map<String, Object> column = new HashMap<>();
column.put("type", "integer");
properties.put(RegisterLockIndex.COLUMN_SEQUENCE, column);
client.createIndex(RegisterLockIndex.NAME, settings, mapping);
}
......
......@@ -104,21 +104,15 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
return aggregation(indexName, valueCName, sourceBuilder, topN, order);
}
private List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
protected List<TopNEntity> aggregation(String indexName, String valueCName, SearchSourceBuilder sourceBuilder,
int topN, Order order) throws IOException {
boolean asc = false;
if (order.equals(Order.ASC)) {
asc = true;
}
TermsAggregationBuilder aggregationBuilder = AggregationBuilders
.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.order(BucketOrder.aggregation(valueCName, asc))
.size(topN)
.subAggregation(
AggregationBuilders.avg(valueCName).field(valueCName)
);
TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
sourceBuilder.aggregation(aggregationBuilder);
SearchResponse response = getClient().search(indexName, sourceBuilder);
......@@ -135,4 +129,15 @@ public class AggregationQueryEsDAO extends EsDAO implements IAggregationQueryDAO
return topNEntities;
}
protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
return AggregationBuilders
.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.order(BucketOrder.aggregation(valueCName, asc))
.size(topN)
.subAggregation(
AggregationBuilders.avg(valueCName).field(valueCName)
);
}
}
......@@ -292,7 +292,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
return services;
}
private BoolQueryBuilder timeRangeQueryBuild(long startTimestamp, long endTimestamp) {
protected BoolQueryBuilder timeRangeQueryBuild(long startTimestamp, long endTimestamp) {
BoolQueryBuilder boolQuery1 = QueryBuilders.boolQuery();
boolQuery1.must().add(QueryBuilders.rangeQuery(RegisterSource.HEARTBEAT_TIME).gte(endTimestamp));
boolQuery1.must().add(QueryBuilders.rangeQuery(RegisterSource.REGISTER_TIME).lte(endTimestamp));
......
......@@ -86,7 +86,7 @@ public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
return intValues;
}
private void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
switch (function) {
case Avg:
parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.6.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>storage-elasticsearch7-plugin</artifactId>
<properties>
<elasticsearch.version>7.0.0</elasticsearch.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>storage-elasticsearch-plugin</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7;
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client.ElasticSearch7Client;
/**
* @author kezhenxu94
*/
public class Es7DAO extends AbstractDAO<ElasticSearch7Client> {
public Es7DAO(final ElasticSearch7Client client) {
super(client);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
/**
* @author kezhenxu94
*/
public class StorageModuleElasticsearch7Config extends StorageModuleElasticsearchConfig {
private int indexMaxResultWindow;
public int getIndexMaxResultWindow() {
return indexMaxResultWindow;
}
public void setIndexMaxResultWindow(final int indexMaxResultWindow) {
this.indexMaxResultWindow = indexMaxResultWindow;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopNRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.BatchProcessEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.HistoryDeleteEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopNRecordsQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.ttl.ElasticsearchStorageTTL;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache.EndpointInventoryCacheEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache.NetworkAddressInventoryCacheEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache.ServiceInstanceInventoryCacheEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache.ServiceInventoryCacheEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client.ElasticSearch7Client;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao.StorageEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base.StorageEs7Installer;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.lock.RegisterLockEs77DAOImpl;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.lock.RegisterLockEs7Installer;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.AggregationQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.AlarmQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.LogQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.MetadataQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.MetricsQueryEs7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query.TraceQueryEs7DAO;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
/**
* @author peng-yongsheng, jian.tan
* @author kezhenxu94
*/
public class StorageModuleElasticsearch7Provider extends ModuleProvider {
protected final StorageModuleElasticsearch7Config config;
protected ElasticSearch7Client elasticSearch7Client;
public StorageModuleElasticsearch7Provider() {
super();
this.config = new StorageModuleElasticsearch7Config();
}
@Override
public String name() {
return "elasticsearch7";
}
@Override
public Class<? extends ModuleDefine> module() {
return StorageModule.class;
}
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void prepare() throws ServiceNotProvidedException {
if (!StringUtil.isEmpty(config.getNameSpace())) {
config.setNameSpace(config.getNameSpace().toLowerCase());
}
elasticSearch7Client = new ElasticSearch7Client(config.getClusterNodes(), config.getProtocol(), config.getTrustStorePath(), config.getTrustStorePass(), config.getNameSpace(), config.getUser(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearch7Client, config.getBulkActions(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockEs77DAOImpl(elasticSearch7Client));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(getManager(), elasticSearch7Client, new ElasticsearchStorageTTL()));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new ServiceInstanceInventoryCacheEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new EndpointInventoryCacheEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new NetworkAddressInventoryCacheEs7DAO(elasticSearch7Client, config.getResultWindowMaxSize()));
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(IMetricsQueryDAO.class, new MetricsQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEs7DAO(elasticSearch7Client, config.getSegmentQueryMaxSize()));
this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEs7DAO(elasticSearch7Client, config.getMetadataQueryMaxSize()));
this.registerServiceImplementation(IAggregationQueryDAO.class, new AggregationQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(IAlarmQueryDAO.class, new AlarmQueryEs7DAO(elasticSearch7Client));
this.registerServiceImplementation(ITopNRecordsQueryDAO.class, new TopNRecordsQueryEsDAO(elasticSearch7Client));
this.registerServiceImplementation(ILogQueryDAO.class, new LogQueryEs7DAO(elasticSearch7Client));
}
@Override
public void start() throws ModuleStartException {
overrideCoreModuleTTLConfig();
try {
elasticSearch7Client.connect();
StorageEs7Installer installer = new StorageEs7Installer(getManager(), config);
installer.install(elasticSearch7Client);
RegisterLockEs7Installer lockInstaller = new RegisterLockEs7Installer(elasticSearch7Client);
lockInstaller.install();
} catch (StorageException | IOException | KeyStoreException | NoSuchAlgorithmException | KeyManagementException | CertificateException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
@Override
public void notifyAfterCompleted() {
}
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
}
private void overrideCoreModuleTTLConfig() {
ConfigService configService = getManager().find(CoreModule.NAME).provider().getService(ConfigService.class);
configService.getDataTTLConfig().setRecordDataTTL(config.getRecordDataTTL());
configService.getDataTTLConfig().setMinuteMetricsDataTTL(config.getMinuteMetricsDataTTL());
configService.getDataTTLConfig().setHourMetricsDataTTL(config.getHourMetricsDataTTL());
configService.getDataTTLConfig().setDayMetricsDataTTL(config.getDayMetricsDataTTL());
configService.getDataTTLConfig().setMonthMetricsDataTTL(config.getMonthMetricsDataTTL());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.base;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.StorageEsInstaller;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* @author kezhenxu94
*/
public class StorageEs7Installer extends StorageEsInstaller {
private static final Logger logger = LoggerFactory.getLogger(StorageEs7Installer.class);
private final StorageModuleElasticsearch7Config config;
public StorageEs7Installer(final ModuleManager moduleManager,
final StorageModuleElasticsearch7Config config) {
super(moduleManager, config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.getIndexRefreshInterval());
this.config = config;
}
@SuppressWarnings("unchecked")
protected Map<String, Object> createMapping(Model model) {
Map<String, Object> mapping = super.createMapping(model);
Map<String, Object> type = (Map<String, Object>) mapping.remove(ElasticSearchClient.TYPE);
mapping.put("properties", type.get("properties"));
logger.debug("elasticsearch index template setting: {}", mapping.toString());
return mapping;
}
protected Map<String, Object> createSetting(boolean record) {
Map<String, Object> setting = super.createSetting(record);
if (config.getIndexMaxResultWindow() > 0) {
setting.put("index.max_result_window", config.getIndexMaxResultWindow());
}
return setting;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.EndpointInventoryCacheEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class EndpointInventoryCacheEs7DAO extends EndpointInventoryCacheEsDAO {
private static final Logger logger = LoggerFactory.getLogger(EndpointInventoryCacheEs7DAO.class);
public EndpointInventoryCacheEs7DAO(ElasticSearchClient client) {
super(client);
}
@Override
public EndpointInventory get(int endpointId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(EndpointInventory.SEQUENCE, endpointId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(EndpointInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().getTotalHits().value == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable e) {
logger.error(e.getMessage(), e);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.NetworkAddressInventoryCacheEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, jian.tan
* @author kezhenxu94
*/
public class NetworkAddressInventoryCacheEs7DAO extends NetworkAddressInventoryCacheEsDAO {
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressInventoryCacheEs7DAO.class);
public NetworkAddressInventoryCacheEs7DAO(ElasticSearchClient client, int resultWindowMaxSize) {
super(client, resultWindowMaxSize);
}
@Override public NetworkAddressInventory get(int addressId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(NetworkAddressInventory.SEQUENCE, addressId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(NetworkAddressInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().getTotalHits().value == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache;
import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInstanceInventoryCacheDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class ServiceInstanceInventoryCacheEs7DAO extends ServiceInstanceInventoryCacheDAO {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstanceInventoryCacheEs7DAO.class);
public ServiceInstanceInventoryCacheEs7DAO(ElasticSearchClient client) {
super(client);
}
@Override
public ServiceInstanceInventory get(int serviceInstanceId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInstanceInventory.SEQUENCE, serviceInstanceId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceInstanceInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().getTotalHits().value == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.cache;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache.ServiceInventoryCacheEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, jian.tan
* @author kezhenxu94
*/
public class ServiceInventoryCacheEs7DAO extends ServiceInventoryCacheEsDAO {
private static final Logger logger = LoggerFactory.getLogger(ServiceInventoryCacheEs7DAO.class);
public ServiceInventoryCacheEs7DAO(final ElasticSearchClient client, final int resultWindowMaxSize) {
super(client, resultWindowMaxSize);
}
@Override public ServiceInventory get(int serviceId) {
try {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.SEQUENCE, serviceId));
searchSourceBuilder.size(1);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder);
if (response.getHits().getTotalHits().value == 1) {
SearchHit searchHit = response.getHits().getAt(0);
return builder.map2Data(searchHit.getSourceAsMap());
} else {
return null;
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class ElasticSearch7Client extends ElasticSearchClient {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearch7Client.class);
public ElasticSearch7Client(final String clusterNodes,
final String protocol,
final String trustStorePath,
final String trustStorePass,
final String namespace, final String user, final String password) {
super(clusterNodes, protocol, trustStorePath, trustStorePass, namespace, user, password);
}
@Override
public void connect() throws IOException, KeyStoreException, NoSuchAlgorithmException, KeyManagementException, CertificateException {
List<HttpHost> hosts = parseClusterNodes(protocol, clusterNodes);
client = createClient(hosts);
client.ping(RequestOptions.DEFAULT);
}
public boolean createIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean createIndex(String indexName,
Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(settings);
request.mapping(mapping);
CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
logger.debug("create {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
/**
* {@inheritDoc}
*/
@Override
public List<String> retrievalIndexByAliases(String aliases) throws IOException {
aliases = formatIndexName(aliases);
GetAliasesRequest getAliasesRequest = new GetAliasesRequest(aliases);
GetAliasesResponse alias = client.indices().getAlias(getAliasesRequest, RequestOptions.DEFAULT);
return new ArrayList<>(alias.getAliases().keySet());
}
protected boolean deleteIndex(String indexName, boolean formatIndexName) throws IOException {
if (formatIndexName) {
indexName = formatIndexName(indexName);
}
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
logger.debug("delete {} index finished, isAcknowledged: {}", indexName, response.isAcknowledged());
return response.isAcknowledged();
}
public boolean isExistsIndex(String indexName) throws IOException {
indexName = formatIndexName(indexName);
GetIndexRequest request = new GetIndexRequest(indexName);
return client.indices().exists(request, RequestOptions.DEFAULT);
}
public boolean isExistsTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
IndexTemplatesExistRequest indexTemplatesExistRequest = new IndexTemplatesExistRequest(indexName);
return client.indices().existsTemplate(indexTemplatesExistRequest, RequestOptions.DEFAULT);
}
public boolean createTemplate(String indexName,
Map<String, Object> settings,
Map<String, Object> mapping) throws IOException {
indexName = formatIndexName(indexName);
PutIndexTemplateRequest putIndexTemplateRequest =
new PutIndexTemplateRequest(indexName)
.patterns(Collections.singletonList(indexName + "-*"))
.alias(new Alias(indexName))
.settings(settings)
.mapping(mapping);
AcknowledgedResponse acknowledgedResponse =
client.indices().putTemplate(putIndexTemplateRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
public boolean deleteTemplate(String indexName) throws IOException {
indexName = formatIndexName(indexName);
DeleteIndexTemplateRequest deleteIndexTemplateRequest = new DeleteIndexTemplateRequest(indexName);
AcknowledgedResponse acknowledgedResponse = client.indices().deleteTemplate(deleteIndexTemplateRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
public SearchResponse search(
String indexName,
SearchSourceBuilder searchSourceBuilder) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
return client.search(searchRequest, RequestOptions.DEFAULT);
}
public GetResponse get(String indexName, String id) throws IOException {
indexName = formatIndexName(indexName);
GetRequest request = new GetRequest(indexName, id);
return client.get(request, RequestOptions.DEFAULT);
}
public SearchResponse ids(
String indexName,
String[] ids) throws IOException {
indexName = formatIndexName(indexName);
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source().query(QueryBuilders.idsQuery().addIds(ids)).size(ids.length);
return client.search(searchRequest, RequestOptions.DEFAULT);
}
public void forceInsert(String indexName, String id, XContentBuilder source) throws IOException {
IndexRequest request = (IndexRequest) prepareInsert(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName,
String id,
XContentBuilder source,
long seqNo,
long primaryTerm) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
request.setIfSeqNo(seqNo);
request.setIfPrimaryTerm(primaryTerm);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request, RequestOptions.DEFAULT);
}
public void forceUpdate(String indexName, String id, XContentBuilder source) throws IOException {
org.elasticsearch.action.update.UpdateRequest request = (org.elasticsearch.action.update.UpdateRequest) prepareUpdate(indexName, id, source);
request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.update(request, RequestOptions.DEFAULT);
}
public InsertRequest prepareInsert(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new ElasticSearch7InsertRequest(indexName, id).source(source);
}
public UpdateRequest prepareUpdate(String indexName, String id, XContentBuilder source) {
indexName = formatIndexName(indexName);
return new ElasticSearch7UpdateRequest(indexName, id).doc(source);
}
public int delete(
String indexName,
String timeBucketColumnName,
long endTimeBucket) throws IOException {
indexName = formatIndexName(indexName);
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setAbortOnVersionConflict(false);
deleteByQueryRequest.setQuery(
QueryBuilders.rangeQuery(timeBucketColumnName).lte(endTimeBucket)
);
BulkByScrollResponse bulkByScrollResponse = client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
logger.debug(
"delete indexName: {}, by query request: {}, response: {}",
indexName, deleteByQueryRequest, bulkByScrollResponse
);
return HttpStatus.SC_OK;
}
public void synchronousBulk(BulkRequest request) {
request.timeout(TimeValue.timeValueMinutes(2));
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.waitForActiveShards(ActiveShardCount.ONE);
try {
int size = request.requests().size();
BulkResponse responses = client.bulk(request, RequestOptions.DEFAULT);
logger.info("Synchronous bulk took time: {} millis, size: {}", responses.getTook().getMillis(), size);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
public BulkProcessor createBulkProcessor(int bulkActions, int flushInterval, int concurrentRequests) {
BulkProcessor.Listener listener = createBulkListener();
return BulkProcessor.builder(
(bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener), listener)
.setBulkActions(bulkActions)
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
.setConcurrentRequests(concurrentRequests)
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class ElasticSearch7InsertRequest extends IndexRequest implements InsertRequest {
public ElasticSearch7InsertRequest(String index, String id) {
super(index);
id(id);
}
@Override
public ElasticSearch7InsertRequest source(XContentBuilder sourceBuilder) {
super.source(sourceBuilder);
return this;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class ElasticSearch7UpdateRequest extends UpdateRequest implements org.apache.skywalking.oap.server.library.client.request.UpdateRequest {
public ElasticSearch7UpdateRequest(String index, String id) {
super(index, id);
}
@Override
public ElasticSearch7UpdateRequest doc(XContentBuilder source) {
super.doc(source);
return this;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MetricsEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class MetricsEs7DAO extends MetricsEsDAO {
MetricsEs7DAO(final ElasticSearchClient client,
final StorageBuilder<Metrics> storageBuilder) {
super(client, storageBuilder);
}
@Override
public List<Metrics> multiGet(Model model, List<String> ids) throws IOException {
SearchResponse response = getClient().ids(model.getName(), ids.toArray(new String[0]));
List<Metrics> result = new ArrayList<>((int) response.getHits().getTotalHits().value);
for (int i = 0; i < response.getHits().getTotalHits().value; i++) {
Metrics source = storageBuilder.map2Data(response.getHits().getAt(i).getSourceAsMap());
result.add(source);
}
return result;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.dao;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RecordEsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.RegisterEsDAO;
/**
* @author peng-yongsheng
*/
public class StorageEs7DAO extends EsDAO implements StorageDAO {
public StorageEs7DAO(ElasticSearchClient client) {
super(client);
}
@Override public IMetricsDAO newMetricsDao(StorageBuilder<Metrics> storageBuilder) {
return new MetricsEs7DAO(getClient(), storageBuilder);
}
@Override public IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder) {
return new RegisterEsDAO(getClient(), storageBuilder);
}
@Override public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
return new RecordEsDAO(getClient(), storageBuilder);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.lock;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockIndex;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.Es7DAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.client.ElasticSearch7Client;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* @author kezhenxu94
*/
public class RegisterLockEs77DAOImpl extends Es7DAO implements IRegisterLockDAO {
private static final Logger logger = LoggerFactory.getLogger(RegisterLockEs77DAOImpl.class);
public RegisterLockEs77DAOImpl(ElasticSearch7Client client) {
super(client);
}
@Override
public int getId(int scopeId, RegisterSource registerSource) {
String id = String.valueOf(scopeId);
int sequence = Const.NONE;
try {
GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
if (response.isExists()) {
Map<String, Object> source = response.getSource();
sequence = ((Number) source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
sequence++;
lock(id, sequence, response.getSeqNo(), response.getPrimaryTerm());
}
} catch (Throwable t) {
logger.warn("Try to lock the row with the id {} failure, error message: {}", id, t.getMessage(), t);
return Const.NONE;
}
return sequence;
}
private void lock(String id, int sequence, final long seqNo, long primaryTerm) throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, seqNo, primaryTerm);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.lock;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockIndex;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock.RegisterLockInstaller;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class RegisterLockEs7Installer extends RegisterLockInstaller {
public RegisterLockEs7Installer(final ElasticSearchClient client) {
super(client);
}
@Override
protected void createIndex() throws IOException {
Map<String, Object> settings = new HashMap<>();
settings.put("index.number_of_shards", 1);
settings.put("index.number_of_replicas", 0);
settings.put("index.refresh_interval", "1s");
Map<String, Object> mapping = new HashMap<>();
Map<String, Object> properties = new HashMap<>();
mapping.put("properties", properties);
Map<String, Object> column = new HashMap<>();
column.put("type", "integer");
properties.put(RegisterLockIndex.COLUMN_SEQUENCE, column);
client.createIndex(RegisterLockIndex.NAME, settings, mapping);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.entity.Order;
import org.apache.skywalking.oap.server.core.query.entity.TopNEntity;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.AggregationQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class AggregationQueryEs7DAO extends AggregationQueryEsDAO {
public AggregationQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
protected List<TopNEntity> aggregation(
String indexName,
String valueCName,
SearchSourceBuilder sourceBuilder,
int topN,
Order order) throws IOException {
boolean asc = false;
if (order.equals(Order.ASC)) {
asc = true;
}
TermsAggregationBuilder aggregationBuilder = aggregationBuilder(valueCName, topN, asc);
sourceBuilder.aggregation(aggregationBuilder);
SearchResponse response = getClient().search(indexName, sourceBuilder);
List<TopNEntity> topNEntities = new ArrayList<>();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket termsBucket : idTerms.getBuckets()) {
TopNEntity topNEntity = new TopNEntity();
topNEntity.setId(termsBucket.getKeyAsString());
Avg value = termsBucket.getAggregations().get(valueCName);
topNEntity.setValue((long) value.getValue());
topNEntities.add(topNEntity);
}
return topNEntities;
}
protected TermsAggregationBuilder aggregationBuilder(final String valueCName, final int topN, final boolean asc) {
return AggregationBuilders
.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.order(BucketOrder.aggregation(valueCName, asc))
.size(topN)
.subAggregation(
AggregationBuilders.avg(valueCName).field(valueCName)
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
import org.apache.skywalking.oap.server.core.query.entity.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.entity.Alarms;
import org.apache.skywalking.oap.server.core.query.entity.Scope;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class AlarmQueryEs7DAO extends EsDAO implements IAlarmQueryDAO {
public AlarmQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
public Alarms getAlarm(final Integer scopeId,
final String keyword,
final int limit,
final int from,
final long startTB,
final long endTB) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(QueryBuilders.rangeQuery(AlarmRecord.TIME_BUCKET).gte(startTB).lte(endTB));
if (Objects.nonNull(scopeId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AlarmRecord.SCOPE, scopeId.intValue()));
}
if (!Strings.isNullOrEmpty(keyword)) {
String matchCName = MatchCNameBuilder.INSTANCE.build(AlarmRecord.ALARM_MESSAGE);
boolQueryBuilder.must().add(QueryBuilders.matchPhraseQuery(matchCName, keyword));
}
sourceBuilder.query(boolQueryBuilder).sort(AlarmRecord.START_TIME, SortOrder.DESC);
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(AlarmRecord.INDEX_NAME, sourceBuilder);
Alarms alarms = new Alarms();
alarms.setTotal((int) response.getHits().getTotalHits().value);
for (SearchHit searchHit : response.getHits().getHits()) {
AlarmRecord.Builder builder = new AlarmRecord.Builder();
AlarmRecord alarmRecord = builder.map2Data(searchHit.getSourceAsMap());
AlarmMessage message = new AlarmMessage();
message.setId(String.valueOf(alarmRecord.getId0()));
message.setMessage(alarmRecord.getAlarmMessage());
message.setStartTime(alarmRecord.getStartTime());
message.setScope(Scope.Finder.valueOf(alarmRecord.getScope()));
message.setScopeId(alarmRecord.getScope());
alarms.getMsgs().add(message);
}
return alarms;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord;
import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.query.entity.ContentType;
import org.apache.skywalking.oap.server.core.query.entity.Log;
import org.apache.skywalking.oap.server.core.query.entity.LogState;
import org.apache.skywalking.oap.server.core.query.entity.Logs;
import org.apache.skywalking.oap.server.core.query.entity.Pagination;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.List;
import static org.apache.skywalking.oap.server.core.analysis.manual.log.AbstractLogRecord.TRACE_ID;
/**
* @author wusheng
* @author kezhenxu94
*/
public class LogQueryEs7DAO extends EsDAO implements ILogQueryDAO {
public LogQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
@Override
public Logs queryLogs(String metricName,
int serviceId,
int serviceInstanceId,
int endpointId,
String traceId,
LogState state,
String stateCode,
Pagination paging,
int from,
int limit,
long startSecondTB,
long endSecondTB) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(Record.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (serviceId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (serviceInstanceId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != Const.NONE) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(stateCode)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.STATUS_CODE, stateCode));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(TRACE_ID, traceId));
}
if (LogState.ERROR.equals(state)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.IS_ERROR, BooleanUtils.booleanToValue(true)));
} else if (LogState.SUCCESS.equals(state)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(AbstractLogRecord.IS_ERROR, BooleanUtils.booleanToValue(false)));
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(metricName, sourceBuilder);
Logs logs = new Logs();
logs.setTotal((int) response.getHits().getTotalHits().value);
for (SearchHit searchHit : response.getHits().getHits()) {
Log log = new Log();
log.setServiceId(((Number) searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_ID)).intValue());
log.setServiceInstanceId(((Number) searchHit.getSourceAsMap().get(AbstractLogRecord.SERVICE_INSTANCE_ID)).intValue());
log.setEndpointId(((Number) searchHit.getSourceAsMap().get(AbstractLogRecord.ENDPOINT_ID)).intValue());
log.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap().get(AbstractLogRecord.IS_ERROR)).intValue()));
log.setStatusCode((String) searchHit.getSourceAsMap().get(AbstractLogRecord.STATUS_CODE));
log.setContentType(ContentType.instanceOf(((Number) searchHit.getSourceAsMap().get(AbstractLogRecord.CONTENT_TYPE)).intValue()));
log.setContent((String) searchHit.getSourceAsMap().get(AbstractLogRecord.CONTENT));
logs.getLogs().add(log);
}
return logs;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import org.apache.skywalking.oap.server.core.register.EndpointInventory;
import org.apache.skywalking.oap.server.core.register.ServiceInventory;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetadataQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
/**
* @author peng-yongsheng
*/
public class MetadataQueryEs7DAO extends MetadataQueryEsDAO {
public MetadataQueryEs7DAO(final ElasticSearchClient client, final int queryMaxSize) {
super(client, queryMaxSize);
}
@Override
public int numOfService(long startTimestamp, long endTimestamp) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp));
boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(0);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return (int) response.getHits().getTotalHits().value;
}
@Override
public int numOfEndpoint() throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT, DetectPoint.SERVER.ordinal()));
sourceBuilder.query(boolQueryBuilder);
sourceBuilder.size(0);
SearchResponse response = getClient().search(EndpointInventory.INDEX_NAME, sourceBuilder);
return (int) response.getHits().getTotalHits().value;
}
@Override
public int numOfConjectural(int nodeTypeValue) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
sourceBuilder.query(QueryBuilders.termQuery(ServiceInventory.NODE_TYPE, nodeTypeValue));
sourceBuilder.size(0);
SearchResponse response = getClient().search(ServiceInventory.INDEX_NAME, sourceBuilder);
return (int) response.getHits().getTotalHits().value;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import org.apache.skywalking.oap.server.core.analysis.Downsampling;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.entity.IntValues;
import org.apache.skywalking.oap.server.core.query.entity.KVInt;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.query.sql.Where;
import org.apache.skywalking.oap.server.core.storage.model.ModelName;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.MetricsQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.Sum;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
/**
* @author peng-yongsheng
* @author kezhenxu94
*/
public class MetricsQueryEs7DAO extends MetricsQueryEsDAO {
public MetricsQueryEs7DAO(ElasticSearchClient client) {
super(client);
}
@Override
public IntValues getValues(
String indName,
Downsampling downsampling,
long startTB,
long endTB,
Where where,
String valueCName,
Function function) throws IOException {
String indexName = ModelName.build(downsampling, indName);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
queryBuild(sourceBuilder, where, startTB, endTB);
TermsAggregationBuilder entityIdAggregation = AggregationBuilders.terms(Metrics.ENTITY_ID).field(Metrics.ENTITY_ID).size(1000);
functionAggregation(function, entityIdAggregation, valueCName);
sourceBuilder.aggregation(entityIdAggregation);
SearchResponse response = getClient().search(indexName, sourceBuilder);
IntValues intValues = new IntValues();
Terms idTerms = response.getAggregations().get(Metrics.ENTITY_ID);
for (Terms.Bucket idBucket : idTerms.getBuckets()) {
long value;
switch (function) {
case Sum:
Sum sum = idBucket.getAggregations().get(valueCName);
value = (long) sum.getValue();
break;
case Avg:
Avg avg = idBucket.getAggregations().get(valueCName);
value = (long) avg.getValue();
break;
default:
avg = idBucket.getAggregations().get(valueCName);
value = (long) avg.getValue();
break;
}
KVInt kvInt = new KVInt();
kvInt.setId(idBucket.getKeyAsString());
kvInt.setValue(value);
intValues.getValues().add(kvInt);
}
return intValues;
}
protected void functionAggregation(Function function, TermsAggregationBuilder parentAggBuilder, String valueCName) {
switch (function) {
case Avg:
parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
break;
case Sum:
parentAggBuilder.subAggregation(AggregationBuilders.sum(valueCName).field(valueCName));
break;
default:
parentAggBuilder.subAggregation(AggregationBuilders.avg(valueCName).field(valueCName));
break;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.query;
import com.google.common.base.Strings;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
import org.apache.skywalking.oap.server.core.query.entity.BasicTrace;
import org.apache.skywalking.oap.server.core.query.entity.QueryOrder;
import org.apache.skywalking.oap.server.core.query.entity.TraceBrief;
import org.apache.skywalking.oap.server.core.query.entity.TraceState;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TraceQueryEsDAO;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.List;
/**
* @author peng-yongsheng
*/
public class TraceQueryEs7DAO extends TraceQueryEsDAO {
public TraceQueryEs7DAO(ElasticSearchClient client, int segmentQueryMaxSize) {
super(client, segmentQueryMaxSize);
}
@Override
public TraceBrief queryBasicTraces(long startSecondTB,
long endSecondTB,
long minDuration,
long maxDuration,
String endpointName,
int serviceId,
int serviceInstanceId,
int endpointId,
String traceId,
int limit,
int from,
TraceState traceState,
QueryOrder queryOrder) throws IOException {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
sourceBuilder.query(boolQueryBuilder);
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTB != 0 && endSecondTB != 0) {
mustQueryList.add(QueryBuilders.rangeQuery(SegmentRecord.TIME_BUCKET).gte(startSecondTB).lte(endSecondTB));
}
if (minDuration != 0 || maxDuration != 0) {
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(SegmentRecord.LATENCY);
if (minDuration != 0) {
rangeQueryBuilder.gte(minDuration);
}
if (maxDuration != 0) {
rangeQueryBuilder.lte(maxDuration);
}
boolQueryBuilder.must().add(rangeQueryBuilder);
}
if (!Strings.isNullOrEmpty(endpointName)) {
String matchCName = MatchCNameBuilder.INSTANCE.build(SegmentRecord.ENDPOINT_NAME);
mustQueryList.add(QueryBuilders.matchPhraseQuery(matchCName, endpointName));
}
if (serviceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_ID, serviceId));
}
if (serviceInstanceId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (endpointId != 0) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.ENDPOINT_ID, endpointId));
}
if (!Strings.isNullOrEmpty(traceId)) {
boolQueryBuilder.must().add(QueryBuilders.termQuery(SegmentRecord.TRACE_ID, traceId));
}
switch (traceState) {
case ERROR:
mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.TRUE));
break;
case SUCCESS:
mustQueryList.add(QueryBuilders.matchQuery(SegmentRecord.IS_ERROR, BooleanUtils.FALSE));
break;
}
switch (queryOrder) {
case BY_START_TIME:
sourceBuilder.sort(SegmentRecord.START_TIME, SortOrder.DESC);
break;
case BY_DURATION:
sourceBuilder.sort(SegmentRecord.LATENCY, SortOrder.DESC);
break;
}
sourceBuilder.size(limit);
sourceBuilder.from(from);
SearchResponse response = getClient().search(SegmentRecord.INDEX_NAME, sourceBuilder);
TraceBrief traceBrief = new TraceBrief();
traceBrief.setTotal((int) response.getHits().getTotalHits().value);
for (SearchHit searchHit : response.getHits().getHits()) {
BasicTrace basicTrace = new BasicTrace();
basicTrace.setSegmentId((String) searchHit.getSourceAsMap().get(SegmentRecord.SEGMENT_ID));
basicTrace.setStart(String.valueOf(searchHit.getSourceAsMap().get(SegmentRecord.START_TIME)));
basicTrace.getEndpointNames().add((String) searchHit.getSourceAsMap().get(SegmentRecord.ENDPOINT_NAME));
basicTrace.setDuration(((Number) searchHit.getSourceAsMap().get(SegmentRecord.LATENCY)).intValue());
basicTrace.setError(BooleanUtils.valueToBoolean(((Number) searchHit.getSourceAsMap().get(SegmentRecord.IS_ERROR)).intValue()));
basicTrace.getTraceIds().add((String) searchHit.getSourceAsMap().get(SegmentRecord.TRACE_ID));
traceBrief.getTraces().add(basicTrace);
}
return traceBrief;
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.oap.server.storage.plugin.elasticsearch7.StorageModuleElasticsearch7Provider
\ No newline at end of file
......@@ -558,6 +558,7 @@
<exclude>org/apache/skywalking/oal/rt/grammar/*.class</exclude>
<exclude>org/apache/skywalking/oap/server/exporter/grpc/*.class</exclude>
<exclude>org/apache/skywalking/oap/server/configuration/service/*.class</exclude>
<exclude>org/apache/skywalking/oap/server/starter/OAPServerStartUp.class</exclude>
<exclude>org/apache/skywalking/apm/toolkit/**/*Activation.class</exclude>
<exclude>org/apache/skywalking/apm/plugin/**/*Instrumentation.class</exclude>
......
......@@ -92,8 +92,6 @@
</wait>
<env>
<discovery.type>single-node</discovery.type>
<thread_pool.index.queue_size>500</thread_pool.index.queue_size>
<thread_pool.write.queue_size>500</thread_pool.write.queue_size>
</env>
</run>
</image>
......@@ -116,6 +114,7 @@
<run>
<env>
<MODE>cluster</MODE>
<ES_VERSION>${elasticsearch.version}</ES_VERSION>
<SW_STORAGE_ES_CLUSTER_NODES>
${e2e.container.name.prefix}-elasticsearch:9200
</SW_STORAGE_ES_CLUSTER_NODES>
......
......@@ -61,7 +61,11 @@ BEGIN {
# in the storage: section now
# disable h2 module
if (in_storage_es_section == 0) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch:$/
if (ENVIRON["ES_VERSION"] ~ /^6.+/) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch:$/
} else if (ENVIRON["ES_VERSION"] ~ /^7.+/) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch7:$/
}
} else {
in_storage_es_section=$0 ~ /^#?\s{4}/
}
......
......@@ -87,8 +87,6 @@
</wait>
<env>
<discovery.type>single-node</discovery.type>
<thread_pool.index.queue_size>500</thread_pool.index.queue_size>
<thread_pool.write.queue_size>500</thread_pool.write.queue_size>
</env>
</run>
</image>
......@@ -98,6 +96,7 @@
<run>
<env>
<MODE>standalone</MODE>
<ES_VERSION>${elasticsearch.version}</ES_VERSION>
<SW_STORAGE_ES_CLUSTER_NODES>
${e2e.container.name.prefix}-elasticsearch:9200
</SW_STORAGE_ES_CLUSTER_NODES>
......
......@@ -33,7 +33,11 @@ BEGIN {
# in the storage: section now
# disable h2 module
if (in_storage_es_section == 0) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch:$/
if (ENVIRON["ES_VERSION"] ~ /^6.+/) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch:$/
} else if (ENVIRON["ES_VERSION"] ~ /^7.+/) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch7:$/
}
} else {
in_storage_es_section=$0 ~ /^#?\s{4}/
}
......
......@@ -22,6 +22,9 @@ base_dir=$(pwd)
build=0
cases=()
DIST_PACKAGE=${DIST_PACKAGE:-apache-skywalking-apm-bin.tar.gz}
ES_VERSION=${ES_VERSION:-6.3.2}
# Parse the arguments
# --build-dist: build the distribution package ignoring the existance of `dist` folder, useful when running e2e locally
......@@ -30,6 +33,9 @@ while [[ $# -gt 0 ]]; do
--build)
build=1
;;
--profiles=*)
profiles=${1#*=}
;;
*)
cases+=($1)
esac
......@@ -46,7 +52,7 @@ done
[[ ${build} -eq 1 ]] \
&& echo 'Building distribution package...' \
&& ./mvnw -q -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -DskipTests clean install
&& ./mvnw --activate-profiles "${profiles}" -q -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -DskipTests -am clean install
echo "Running cases: $(IFS=$' '; echo "${cases[*]}")"
......@@ -58,9 +64,13 @@ do
# Some of the tests will modify files in the distribution folder, e.g. cluster test will modify the application.yml
# so we give each test a separate distribution folder here
mkdir -p "$test_case" && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C "$test_case"
./mvnw -Dbuild.id="${BUILD_ID:-local}" -De2e.container.version="${E2E_VERSION}" -Dsw.home="${base_dir}/$test_case/apache-skywalking-apm-bin" -f test/e2e/pom.xml -pl "$test_case" -am verify
mkdir -p "$test_case" && tar -zxf dist/${DIST_PACKAGE} -C "$test_case"
./mvnw -Dbuild.id="${BUILD_ID:-local}" \
-De2e.container.version="${E2E_VERSION}" \
-Delasticsearch.version="${ES_VERSION}" \
-Dsw.home="${base_dir}/$test_case/${DIST_PACKAGE//.tar.gz/}" \
-f test/e2e/pom.xml -pl "$test_case" -am verify
status_code=$?
......
......@@ -17,6 +17,8 @@
# limitations under the License.
#
tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist
# List all modules(jars) that belong to the SkyWalking itself, these will be ignored
# when checking the dependency licenses
./mvnw -Pbackend -Dexec.executable='echo' -Dexec.args='${project.artifactId}-${project.version}.jar' exec:exec -q > self-modules.txt
......@@ -33,3 +35,15 @@ grep -vf self-modules.txt all-dependencies.txt > third-party-dependencies.txt
# used to sort the file `known-oap-backend-dependencies.txt`,
# i.e. "sort the two file using the same command (and default arguments)"
diff -w -B -U0 <(cat tools/dependencies/known-oap-backend-dependencies.txt | sort) <(cat third-party-dependencies.txt | sort)
[[ $? -ne 0 ]] && exit $?
# Check ES7 distribution package
tar -zxf dist/apache-skywalking-apm-bin-es7.tar.gz -C dist
ls dist/apache-skywalking-apm-bin-es7/oap-libs > all-dependencies-es7.txt
grep -vf self-modules.txt all-dependencies-es7.txt > third-party-dependencies-es7.txt
diff -w -B -U0 <(cat tools/dependencies/known-oap-backend-dependencies-es7.txt | sort) <(cat third-party-dependencies-es7.txt | sort)
aggs-matrix-stats-client-7.0.0.jar
animal-sniffer-annotations-1.14.jar
annotations-13.0.jar
antlr4-runtime-4.7.1.jar
aopalliance-1.0.jar
apollo-client-1.4.0.jar
apollo-core-1.4.0.jar
bcpkix-jdk15on-1.59.jar
bcprov-ext-jdk15on-1.59.jar
bcprov-jdk15on-1.59.jar
builder-annotations-0.9.2.jar
client-java-4.0.0.jar
client-java-api-4.0.0.jar
client-java-proto-4.0.0.jar
commons-codec-1.11.jar
commons-compress-1.18.jar
commons-dbcp-1.4.jar
commons-io-2.6.jar
commons-lang3-3.7.jar
commons-pool-1.5.4.jar
commons-text-1.4.jar
consul-client-1.2.6.jar
converter-jackson-2.3.0.jar
compiler-0.9.3.jar
curator-client-4.0.1.jar
curator-framework-4.0.1.jar
curator-recipes-4.0.1.jar
curator-x-discovery-4.0.1.jar
elasticsearch-7.0.0.jar
elasticsearch-cli-7.0.0.jar
elasticsearch-core-7.0.0.jar
elasticsearch-geo-7.0.0.jar
elasticsearch-rest-client-7.0.0.jar
elasticsearch-rest-high-level-client-7.0.0.jar
elasticsearch-secure-sm-7.0.0.jar
elasticsearch-x-content-7.0.0.jar
error_prone_annotations-2.0.18.jar
etcd4j-2.17.0.jar
fastjson-1.2.47.jar
freemarker-2.3.28.jar
graphql-java-8.0.jar
graphql-java-tools-5.2.3.jar
groovy-2.4.5-indy.jar
grpc-context-1.15.1.jar
grpc-core-1.15.1.jar
grpc-netty-1.15.1.jar
grpc-protobuf-1.15.1.jar
grpc-protobuf-lite-1.15.1.jar
grpc-stub-1.15.1.jar
gson-2.8.1.jar
guava-23.1-jre.jar
guice-4.1.0.jar
h2-1.4.196.jar
HdrHistogram-2.1.9.jar
HikariCP-3.1.0.jar
hppc-0.7.1.jar
httpasyncclient-4.1.4.jar
httpclient-4.5.7.jar
httpcore-4.4.11.jar
httpcore-nio-4.4.11.jar
jackson-annotations-2.9.5.jar
jackson-core-2.9.5.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.9.5.jar
jackson-dataformat-cbor-2.8.11.jar
jackson-dataformat-smile-2.8.11.jar
jackson-dataformat-yaml-2.8.11.jar
jackson-datatype-guava-2.9.5.jar
jackson-datatype-jdk8-2.9.5.jar
jackson-mapper-asl-1.9.13.jar
jackson-module-afterburner-2.9.5.jar
jackson-module-kotlin-2.8.8.jar
java-dataloader-2.0.2.jar
javassist-3.25.0-GA.jar
javax.inject-1.jar
javax.servlet-api-3.1.0.jar
jcl-over-slf4j-1.7.25.jar
jetty-http-9.4.2.v20170220.jar
jetty-io-9.4.2.v20170220.jar
jetty-security-9.4.2.v20170220.jar
jetty-server-9.4.2.v20170220.jar
jetty-servlet-9.4.2.v20170220.jar
jetty-util-9.4.2.v20170220.jar
jline-0.9.94.jar
jna-4.5.1.jar
joda-convert-1.2.jar
joda-time-2.10.5.jar
jopt-simple-4.6.jar
json-flattener-0.6.0.jar
jsr305-1.3.9.jar
kotlin-reflect-1.1.1.jar
kotlin-stdlib-1.1.60.jar
lang-mustache-client-7.0.0.jar
log4j-1.2.16.jar
log4j-api-2.9.0.jar
log4j-core-2.9.0.jar
log4j-over-slf4j-1.7.25.jar
log4j-slf4j-impl-2.9.0.jar
logging-interceptor-2.7.5.jar
lucene-analyzers-common-8.0.0.jar
lucene-backward-codecs-8.0.0.jar
lucene-core-8.0.0.jar
lucene-grouping-8.0.0.jar
lucene-highlighter-8.0.0.jar
lucene-join-8.0.0.jar
lucene-memory-8.0.0.jar
lucene-misc-8.0.0.jar
lucene-queries-8.0.0.jar
lucene-queryparser-8.0.0.jar
lucene-sandbox-8.0.0.jar
lucene-spatial-8.0.0.jar
lucene-spatial-extras-8.0.0.jar
lucene-spatial3d-8.0.0.jar
lucene-suggest-8.0.0.jar
minimal-json-0.9.5.jar
nacos-api-1.0.0.jar
nacos-client-1.0.0.jar
nacos-common-1.0.0.jar
netty-3.10.5.Final.jar
netty-buffer-4.1.27.Final.jar
netty-codec-4.1.27.Final.jar
netty-codec-dns-4.1.27.Final.jar
netty-codec-http-4.1.27.Final.jar
netty-codec-http2-4.1.27.Final.jar
netty-codec-socks-4.1.27.Final.jar
netty-common-4.1.27.Final.jar
netty-handler-4.1.27.Final.jar
netty-handler-proxy-4.1.27.Final.jar
netty-resolver-4.1.27.Final.jar
netty-resolver-dns-4.1.27.Final.jar
netty-tcnative-boringssl-static-2.0.7.Final.jar
netty-transport-4.1.27.Final.jar
okhttp-2.7.5.jar
okhttp-3.9.0.jar
okhttp-ws-2.7.5.jar
okio-1.13.0.jar
opencensus-api-0.12.3.jar
opencensus-contrib-grpc-metrics-0.12.3.jar
parent-join-client-7.0.0.jar
proto-google-common-protos-1.0.0.jar
protobuf-java-3.4.0.jar
rank-eval-client-7.0.0.jar
reactive-streams-1.0.2.jar
reflectasm-1.11.7.jar
resourcecify-annotations-0.9.2.jar
retrofit-2.3.0.jar
sharding-jdbc-core-2.0.3.jar
simpleclient-0.6.0.jar
simpleclient_common-0.6.0.jar
simpleclient_hotspot-0.6.0.jar
simpleclient_httpserver-0.6.0.jar
slf4j-api-1.7.25.jar
snakeyaml-1.18.jar
sundr-codegen-0.9.2.jar
sundr-core-0.9.2.jar
swagger-annotations-1.5.12.jar
t-digest-3.2.jar
zookeeper-3.4.10.jar
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册