提交 721f8648 编写于 作者: K kezhenxu94 提交者: wu-sheng

Add TTL E2E test (#3437)

* Add TTL E2E test

* Add to Jenkins stage and minor bugfix

* Upgrade e2e container version

* Polish and minor fix

* Polish
上级 ce20fe10
......@@ -7,3 +7,6 @@
[submodule "skywalking-ui"]
path = skywalking-ui
url = https://github.com/apache/skywalking-rocketbot-ui.git
[submodule "test/e2e/e2e-ttl/e2e-ttl-es/src/main/proto"]
path = test/e2e/e2e-ttl/e2e-ttl-es/src/main/proto
url = https://github.com/apache/skywalking-data-collect-protocol.git
......@@ -44,12 +44,13 @@ pipeline {
// we're using them as a barrier here to filter out some invalid PRs (fast-fail)
// thus save unnecessary E2E builds(which is expensive)
sh './mvnw checkstyle:check apache-rat:check'
sh './mvnw -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean package'
sh './mvnw -Dcheckstyle.skip -Drat.skip -T2 -Dmaven.compile.fork -Dmaven.compiler.maxmem=3072 -DskipTests clean install'
// Some of the tests will modify files in the distribution folder, e.g. cluster test will modify the application.yml
// so we give each test a separate distribution folder here
sh 'mkdir -p dist-for-single-node-service && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-single-node-service'
sh 'mkdir -p dist-for-cluster && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-cluster'
sh 'mkdir -p dist-for-agent-reboot && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-agent-reboot'
sh 'mkdir -p dist-for-ttl-es && tar -zxf dist/apache-skywalking-apm-bin.tar.gz -C dist-for-ttl-es'
}
}
......@@ -78,6 +79,12 @@ pipeline {
sh './mvnw -Dbuild.id=${BUILD_ID} -f test/e2e/pom.xml -pl e2e-agent-reboot -am verify'
}
}
stage('Run TTL Tests') {
steps {
sh './mvnw -Dbuild.id=${BUILD_ID} -f test/e2e/pom.xml -pl e2e-ttl/e2e-ttl-es -am verify'
}
}
}
}
}
......@@ -95,7 +102,7 @@ pipeline {
// inside the container
//
// Delete all distribution folder
sh 'docker run -v $(pwd):/sw alpine sleep 10 && rm -rf /sw/dist-for-cluster/* /sw/dist-for-single-node-service/* /sw/dist-for-agent-reboot/*'
sh 'docker run -v $(pwd):/sw alpine sh -c "sleep 10 && rm -rf /sw/dist-for-cluster/* /sw/dist-for-single-node-service/* /sw/dist-for-agent-reboot/* /sw/dist-for-ttl-es/*"'
deleteDir()
}
}
......
......@@ -259,6 +259,7 @@ core:
- Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
enableDataKeeperExecutor: \${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
dataKeeperExecutePeriod: \${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute
recordDataTTL: \${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: \${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: \${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
......
......@@ -8,6 +8,7 @@ You have following settings for different types.
```yaml
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
......
......@@ -45,6 +45,7 @@ public class CoreModuleConfig extends ModuleConfig {
*/
@Setter private long persistentPeriod = 3;
@Setter private boolean enableDataKeeperExecutor = true;
@Setter private int dataKeeperExecutePeriod = 5;
@Setter private int recordDataTTL;
@Setter private int minuteMetricsDataTTL;
@Setter private int hourMetricsDataTTL;
......
......@@ -203,7 +203,7 @@ public class CoreModuleProvider extends ModuleProvider {
PersistenceTimer.INSTANCE.start(getManager(), moduleConfig);
if (moduleConfig.isEnableDataKeeperExecutor()) {
DataTTLKeeperTimer.INSTANCE.start(getManager());
DataTTLKeeperTimer.INSTANCE.start(getManager(), moduleConfig);
}
CacheUpdateTimer.INSTANCE.start(getManager());
......
......@@ -18,18 +18,26 @@
package org.apache.skywalking.oap.server.core.storage.ttl;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterNodesQuery;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.Model;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author peng-yongsheng
......@@ -42,13 +50,13 @@ public enum DataTTLKeeperTimer {
private ModuleManager moduleManager;
private ClusterNodesQuery clusterNodesQuery;
public void start(ModuleManager moduleManager) {
public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
this.moduleManager = moduleManager;
this.clusterNodesQuery = moduleManager.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
new RunnableWithExceptionProtection(this::delete,
t -> logger.error("Remove data in background failure.", t)), 5, 5, TimeUnit.MINUTES);
new RunnableWithExceptionProtection(this::delete, t -> logger.error("Remove data in background failure.", t)),
moduleConfig.getDataKeeperExecutePeriod(), moduleConfig.getDataKeeperExecutePeriod(), TimeUnit.MINUTES);
}
private void delete() {
......
......@@ -61,6 +61,7 @@ core:
- Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
......
......@@ -60,6 +60,7 @@ core:
- Month
# Set a timeout on metrics data. After the timeout has expired, the metrics data will automatically be deleted.
enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true} # Turn it off then automatically metrics data delete will be close.
dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5} # How often the data keeper executor runs periodically, unit is minute
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90} # Unit is minute
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90} # Unit is minute
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36} # Unit is hour
......
......@@ -28,6 +28,8 @@ import java.time.format.DateTimeFormatter;
public abstract class AbstractQuery<T extends AbstractQuery<?>> {
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HHmmss");
private static final DateTimeFormatter MINUTE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HHmm");
private static final DateTimeFormatter DAY_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter MONTH_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM");
private String start;
private String end;
......@@ -37,9 +39,19 @@ public abstract class AbstractQuery<T extends AbstractQuery<?>> {
if (start != null) {
return start;
}
return "SECOND".equals(step())
? LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(TIME_FORMATTER)
: LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(MINUTE_TIME_FORMATTER);
if ("SECOND".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(TIME_FORMATTER);
}
if ("MINUTE".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(MINUTE_TIME_FORMATTER);
}
if ("DAY".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(DAY_TIME_FORMATTER);
}
if ("MONTH".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(MONTH_TIME_FORMATTER);
}
return null;
}
public T start(String start) {
......@@ -52,6 +64,10 @@ public abstract class AbstractQuery<T extends AbstractQuery<?>> {
this.start = start.format(MINUTE_TIME_FORMATTER);
} else if ("SECOND".equals(step())) {
this.start = start.format(TIME_FORMATTER);
} else if ("DAY".equals(step())) {
this.start = start.format(DAY_TIME_FORMATTER);
} else if ("MONTH".equals(step())) {
this.start = start.format(MONTH_TIME_FORMATTER);
}
return (T) this;
}
......@@ -60,9 +76,19 @@ public abstract class AbstractQuery<T extends AbstractQuery<?>> {
if (end != null) {
return end;
}
return "SECOND".equals(step())
? LocalDateTime.now(ZoneOffset.UTC).format(TIME_FORMATTER)
: LocalDateTime.now(ZoneOffset.UTC).format(MINUTE_TIME_FORMATTER);
if ("SECOND".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).format(TIME_FORMATTER);
}
if ("MINUTE".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).format(MINUTE_TIME_FORMATTER);
}
if ("DAY".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).format(DAY_TIME_FORMATTER);
}
if ("MONTH".equals(step())) {
return LocalDateTime.now(ZoneOffset.UTC).format(MONTH_TIME_FORMATTER);
}
return null;
}
public AbstractQuery end(String end) {
......@@ -75,6 +101,10 @@ public abstract class AbstractQuery<T extends AbstractQuery<?>> {
this.end = end.format(MINUTE_TIME_FORMATTER);
} else if ("SECOND".equals(step())) {
this.end = end.format(TIME_FORMATTER);
} else if ("DAY".equals(step())) {
this.end = end.format(DAY_TIME_FORMATTER);
} else if ("MONTH".equals(step())) {
this.end = end.format(MONTH_TIME_FORMATTER);
}
return (T) this;
}
......@@ -88,6 +118,16 @@ public abstract class AbstractQuery<T extends AbstractQuery<?>> {
return (T) this;
}
public T stepByMonth() {
this.step = "MONTH";
return (T) this;
}
public T stepByDay() {
this.step = "DAY";
return (T) this;
}
public T stepByMinute() {
this.step = "MINUTE";
return (T) this;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e.metrics;
import org.apache.skywalking.e2e.verification.AbstractMatcher;
import org.assertj.core.api.Condition;
import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author kezhenxu94
*/
public class AllOfMetricsMatcher extends AbstractMatcher<Metrics> {
private MetricsValueMatcher value;
@Override
public void verify(Metrics metrics) {
assertThat(metrics.getValues()).isNotEmpty();
assertThat(metrics.getValues()).allMatch(value -> {
try {
AllOfMetricsMatcher.this.getValue().verify(value);
return true;
} catch (Throwable t) {
return false;
}
});
}
public MetricsValueMatcher getValue() {
return value;
}
public void setValue(MetricsValueMatcher value) {
this.value = value;
}
@Override
public String toString() {
return "AllOfMetricsMatcher{" +
"value=" + value +
'}';
}
}
......@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public abstract class AbstractMatcher<T> {
private static final Pattern NE_MATCHER = Pattern.compile("ne\\s+(?<val>.+)");
private static final Pattern EQ_MATCHER = Pattern.compile("eq\\s+(?<val>.+)");
private static final Pattern GT_MATCHER = Pattern.compile("gt\\s+(?<val>.+)");
private static final Pattern GE_MATCHER = Pattern.compile("ge\\s+(?<val>.+)");
private static final Pattern NN_MATCHER = Pattern.compile("^not null$");
......@@ -65,6 +66,15 @@ public abstract class AbstractMatcher<T> {
return;
}
matcher = EQ_MATCHER.matcher(expected);
if (matcher.find()) {
String val = matcher.group("val");
assertThat(val).isNotBlank();
assertThat(Double.parseDouble(actual)).isEqualTo(Double.parseDouble(val));
return;
}
assertThat(actual).isEqualTo(expected);
}
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>e2e-ttl</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>e2e-ttl-es</artifactId>
<properties>
<e2e.dist.directory>dist-for-ttl-es</e2e.dist.directory>
<e2e.container.version>1.2</e2e.container.version>
<e2e.container.name.prefix>skywalking-e2e-container-${build.id}-ttl-es</e2e.container.name.prefix>
<grpc.version>1.14.0</grpc.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>e2e-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<configuration>
<containerNamePattern>%a-%t-%i</containerNamePattern>
<imagePullPolicy>Always</imagePullPolicy>
<images>
<image>
<name>elastic/elasticsearch:${elasticsearch.version}</name>
<alias>${e2e.container.name.prefix}-elasticsearch</alias>
<run>
<ports>
<port>es.port:9200</port>
</ports>
<wait>
<http>
<url>http://localhost:${es.port}</url>
<method>GET</method>
<status>200</status>
</http>
<time>120000</time>
</wait>
<env>
<discovery.type>single-node</discovery.type>
<thread_pool.index.queue_size>500</thread_pool.index.queue_size>
<thread_pool.write.queue_size>500</thread_pool.write.queue_size>
</env>
</run>
</image>
<image>
<name>skyapm/e2e-container:${e2e.container.version}</name>
<alias>${e2e.container.name.prefix}</alias>
<run>
<env>
<MODE>standalone</MODE>
<SW_STORAGE_ES_CLUSTER_NODES>
${e2e.container.name.prefix}-elasticsearch:9200
</SW_STORAGE_ES_CLUSTER_NODES>
</env>
<dependsOn>
<container>${e2e.container.name.prefix}-elasticsearch</container>
</dependsOn>
<ports>
<port>+webapp.host:webapp.port:8081</port>
<port>+oap.host:oap.port:11800</port>
</ports>
<links>
<link>${e2e.container.name.prefix}-elasticsearch</link>
</links>
<volumes>
<bind>
<volume>
../../../../${e2e.dist.directory}/apache-skywalking-apm-bin:/sw
</volume>
<volume>
${project.basedir}/src/docker/rc.d:/rc.d:ro
</volume>
<volume>
${project.basedir}/src/docker/es_storage.awk:/es_storage.awk
</volume>
</bind>
</volumes>
<wait>
<log>SkyWalking e2e container is ready for tests</log>
<time>3000000</time>
</wait>
</run>
</image>
</images>
</configuration>
</plugin>
<!-- set the system properties that can be used in test codes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<systemPropertyVariables>
<sw.webapp.host>${webapp.host}</sw.webapp.host>
<sw.webapp.port>${webapp.port}</sw.webapp.port>
<oap.host>${oap.host}</oap.host>
<oap.port>${oap.port}</oap.port>
</systemPropertyVariables>
</configuration>
<executions>
<execution>
<goals>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.8.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>
</build>
</project>
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/awk -f
BEGIN {
in_storage_section=0;
in_storage_es_section=0;
in_storage_h2_section=0;
}
{
if (in_storage_section == 0) {
in_storage_section=$0 ~ /^storage:$/
} else {
in_storage_section=$0 ~ /^(#|\s{2})/
}
if (in_storage_section == 1) {
# in the storage: section now
# disable h2 module
if (in_storage_es_section == 0) {
in_storage_es_section=$0 ~ /^#?\s+elasticsearch:$/
} else {
in_storage_es_section=$0 ~ /^#?\s{4}/
}
if (in_storage_h2_section == 0) {
in_storage_h2_section=$0 ~ /^#?\s+h2:$/
} else {
in_storage_h2_section=$0 ~ /^#?\s{4}/
}
if (in_storage_es_section == 1) {
# in the storage.elasticsearch section now
# uncomment es config
gsub("^#", "", $0)
print
} else if (in_storage_h2_section == 1) {
# comment out h2 config
if ($0 !~ /^#/) {
print "#" $0
} else {
print
}
} else {
print
}
} else {
print
}
}
# Licensed to the SkyAPM under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
original_wd=$(pwd)
# substitute application.yml to be capable of es mode
cd ${SW_HOME}/config \
&& awk -f /es_storage.awk application.yml > es_storage_app.yml \
&& mv es_storage_app.yml application.yml \
&& sed '/<Loggers>/a<logger name="org.apache.skywalking.oap.server.storage" level="DEBUG"/>' log4j2.xml > log4j2debuggable.xml \
&& mv log4j2debuggable.xml log4j2.xml
cd ${original_wd}
\ No newline at end of file
# Licensed to the SkyAPM under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/env bash
echo 'starting OAP server...' \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \
SW_CORE_DATA_KEEPER_EXECUTE_PERIOD=1 \
SW_STORAGE_ES_MONTH_METRIC_DATA_TTL=4 \
SW_STORAGE_ES_OTHER_METRIC_DATA_TTL=5 \
SW_STORAGE_ES_FLUSH_INTERVAL=1 \
SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
start_oap 'init'
echo 'starting Web app...' \
&& start_webapp '0.0.0.0' 8081
echo "SkyWalking e2e container is ready for tests"
tail -f ${OAP_LOG_DIR}/* \
${WEBAPP_LOG_DIR}/* \
${ES_HOME}/logs/elasticsearch.log \
${ES_HOME}/logs/stdout.log
Subproject commit 7b244ff7ec350910295eee85633e02d92a6f6b1c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.skywalking.e2e;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.internal.DnsNameResolverProvider;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.network.common.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.MeshProbeDownstream;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetricServiceGrpc;
import org.apache.skywalking.e2e.metrics.AllOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.AtLeastOneOfMetricsMatcher;
import org.apache.skywalking.e2e.metrics.Metrics;
import org.apache.skywalking.e2e.metrics.MetricsQuery;
import org.apache.skywalking.e2e.metrics.MetricsValueMatcher;
import org.apache.skywalking.e2e.service.Service;
import org.apache.skywalking.e2e.service.ServicesQuery;
import org.junit.Before;
import org.junit.Test;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import static org.apache.skywalking.e2e.metrics.MetricsQuery.SERVICE_P99;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author kezhenxu94
*/
@Slf4j
public class StorageTTLITCase {
private static final int SW_STORAGE_ES_MONTH_METRIC_DATA_TTL = 4;
private static final int SW_STORAGE_ES_OTHER_METRIC_DATA_TTL = 5;
private static final int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
private static final boolean USE_PLAIN_TEXT = true;
private static final boolean SUCCESS = true;
private SimpleQueryClient queryClient;
private ServiceMeshMetricServiceGrpc.ServiceMeshMetricServiceStub grpcStub;
@Before
public void setUp() {
final String swWebappHost = System.getProperty("sw.webapp.host", "127.0.0.1");
final String swWebappPort = System.getProperty("sw.webapp.port", "32789");
final String oapPort = System.getProperty("oap.port", "32788");
queryClient = new SimpleQueryClient(swWebappHost, swWebappPort);
final ManagedChannelBuilder builder =
NettyChannelBuilder.forAddress("127.0.0.1", Integer.parseInt(oapPort))
.nameResolverFactory(new DnsNameResolverProvider())
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.usePlaintext(USE_PLAIN_TEXT);
final ManagedChannel channel = builder.build();
grpcStub = ServiceMeshMetricServiceGrpc.newStub(channel);
}
@Test(timeout = 360000)
public void dayMetricsDataShouldBeRemovedAfterTTL() throws Exception {
final ServiceMeshMetric.Builder builder = ServiceMeshMetric
.newBuilder()
.setSourceServiceName("e2e-test-source-service")
.setSourceServiceInstance("e2e-test-source-service-instance")
.setDestServiceName("e2e-test-dest-service")
.setDestServiceInstance("e2e-test-dest-service-instance")
.setEndpoint("e2e/test")
.setLatency(2000)
.setResponseCode(200)
.setStatus(SUCCESS)
.setProtocol(Protocol.HTTP)
.setDetectPoint(DetectPoint.server);
final LocalDateTime now = LocalDateTime.now();
final LocalDateTime startTime = now.minusDays(SW_STORAGE_ES_OTHER_METRIC_DATA_TTL + 1);
final LocalDateTime endTime = startTime.plusMinutes(1);
final LocalDateTime queryStart = startTime;
final LocalDateTime queryEnd = now.minusDays(SW_STORAGE_ES_OTHER_METRIC_DATA_TTL);
ensureSendingMetricsWorks(
builder,
startTime.toEpochSecond(ZoneOffset.UTC) * 1000,
endTime.toEpochSecond(ZoneOffset.UTC) * 1000,
queryStart,
queryEnd,
"DAY"
);
shouldBeEmptyBetweenTimeRange(queryStart, queryEnd, "DAY");
}
@Test(timeout = 360000)
public void monthMetricsDataShouldBeRemovedAfterTTL() throws Exception {
final ServiceMeshMetric.Builder builder = ServiceMeshMetric
.newBuilder()
.setSourceServiceName("e2e-test-source-service")
.setSourceServiceInstance("e2e-test-source-service-instance")
.setDestServiceName("e2e-test-dest-service")
.setDestServiceInstance("e2e-test-dest-service-instance")
.setEndpoint("e2e/test")
.setLatency(2000)
.setResponseCode(200)
.setStatus(SUCCESS)
.setProtocol(Protocol.HTTP)
.setDetectPoint(DetectPoint.server);
final LocalDateTime now = LocalDateTime.now();
final LocalDateTime startTime = now.minusMonths(SW_STORAGE_ES_MONTH_METRIC_DATA_TTL + 1);
final LocalDateTime endTime = startTime.plusMinutes(1);
final LocalDateTime queryStart = startTime;
final LocalDateTime queryEnd = now.minusMonths(SW_STORAGE_ES_MONTH_METRIC_DATA_TTL);
ensureSendingMetricsWorks(
builder,
startTime.toEpochSecond(ZoneOffset.UTC) * 1000,
endTime.toEpochSecond(ZoneOffset.UTC) * 1000,
queryStart,
queryEnd,
"MONTH"
);
shouldBeEmptyBetweenTimeRange(queryStart, queryEnd, "MONTH");
}
private void shouldBeEmptyBetweenTimeRange(
final LocalDateTime queryStart,
final LocalDateTime queryEnd,
final String step
) throws InterruptedException {
boolean valid = false;
for (int i = 0; i < 10 && !valid; i++) {
try {
final Metrics serviceMetrics = queryMetrics(queryStart, queryEnd, step);
log.info("ServiceMetrics: {}", serviceMetrics);
AllOfMetricsMatcher instanceRespTimeMatcher = new AllOfMetricsMatcher();
MetricsValueMatcher equalsZero = new MetricsValueMatcher();
equalsZero.setValue("eq 0");
instanceRespTimeMatcher.setValue(equalsZero);
try {
instanceRespTimeMatcher.verify(serviceMetrics);
valid = true;
} catch (Throwable t) {
log.warn("History metrics data are not deleted yet, {}", t.getMessage());
Thread.sleep(60000);
}
} catch (Throwable t) {
log.warn("History metrics data are not deleted yet", t);
Thread.sleep(60000);
}
}
}
private void ensureSendingMetricsWorks(
final ServiceMeshMetric.Builder builder,
final long startTime,
final long endTime,
final LocalDateTime queryStart,
final LocalDateTime queryEnd,
final String step
) throws Exception {
boolean prepared = false;
while (!prepared) {
sendMetrics(
builder
.setStartTime(startTime)
.setEndTime(endTime)
.build()
);
final Metrics serviceMetrics = queryMetrics(queryStart, queryEnd, step);
final AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
final MetricsValueMatcher greaterThanZero = new MetricsValueMatcher();
greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero);
try {
instanceRespTimeMatcher.verify(serviceMetrics);
prepared = true;
} catch (Throwable ignored) {
sendMetrics(
builder
.setStartTime(startTime)
.setEndTime(endTime)
.build()
);
Thread.sleep(10000);
}
}
}
private Metrics queryMetrics(
final LocalDateTime queryStart,
final LocalDateTime queryEnd,
final String step
) throws Exception {
for (int i = 0; i < 10; i++) {
try {
final List<Service> services = queryClient.services(
new ServicesQuery()
.start(LocalDateTime.now().minusDays(SW_STORAGE_ES_OTHER_METRIC_DATA_TTL))
.end(LocalDateTime.now())
);
log.info("Services: {}", services);
assertThat(services).isNotEmpty();
String serviceId = services.stream().filter(it -> "e2e-test-dest-service".equals(it.getLabel())).findFirst().get().getKey();
return queryClient.metrics(
new MetricsQuery()
.id(serviceId)
.metricsName(SERVICE_P99)
.step(step)
.start(queryStart)
.end(queryEnd)
);
} catch (Throwable ignored) {
Thread.sleep(10000);
}
}
return null;
}
private void sendMetrics(final ServiceMeshMetric metrics) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
StreamObserver<ServiceMeshMetric> collect = grpcStub.collect(new StreamObserver<MeshProbeDownstream>() {
@Override
public void onNext(final MeshProbeDownstream meshProbeDownstream) {
}
@Override
public void onError(final Throwable throwable) {
throwable.printStackTrace();
latch.countDown();
}
@Override
public void onCompleted() {
latch.countDown();
}
});
collect.onNext(metrics);
collect.onCompleted();
latch.await();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apache-skywalking-e2e</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>e2e-ttl</artifactId>
<packaging>pom</packaging>
<modules>
<module>e2e-ttl-es</module>
</modules>
</project>
\ No newline at end of file
......@@ -36,6 +36,7 @@
<module>e2e-single-service</module>
<module>e2e-cluster</module>
<module>e2e-agent-reboot</module>
<module>e2e-ttl</module>
</modules>
<properties>
......@@ -50,7 +51,7 @@
<faster.jackson.version>2.9.7</faster.jackson.version>
<junit.version>4.12</junit.version>
<jackson.version>2.9.7</jackson.version>
<guava.version>28.0-jre</guava.version>
<guava.version>20.0</guava.version>
<snake.version>1.23</snake.version>
<gson.version>2.8.5</gson.version>
<h2.version>1.4.199</h2.version>
......@@ -61,7 +62,7 @@
<!-- build.id is an available environment variable in Jenkins to
distinguish the different build jobs, once Jenkins job is aborted,
we will use this build.id to stop all containers that are started
during this specific build, see Jenkins-E2E (post stage) for detail
during this specific build, see Jenkins-E2E (cleanup stage) for detail
-->
<build.id>local</build.id>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册