提交 758b085c 编写于 作者: K kezhenxu94 提交者: wu-sheng

Fix that e2e fails occasionally (#3125)

* Override ES settings in FAQ

* Resend traffic data when invalid

* Remove unsupported ES setting

* Use seperated buffer directory and remove unnecessary web app 2
上级 1a36c38b
...@@ -85,6 +85,8 @@ ...@@ -85,6 +85,8 @@
</wait> </wait>
<env> <env>
<discovery.type>single-node</discovery.type> <discovery.type>single-node</discovery.type>
<thread_pool.index.queue_size>500</thread_pool.index.queue_size>
<thread_pool.write.queue_size>500</thread_pool.write.queue_size>
</env> </env>
</run> </run>
</image> </image>
......
...@@ -25,7 +25,7 @@ if test "${MODE}" = "cluster"; then ...@@ -25,7 +25,7 @@ if test "${MODE}" = "cluster"; then
&& mv clusterized_app.yml application.yml && mv clusterized_app.yml application.yml
cd ${SW_HOME}/webapp \ cd ${SW_HOME}/webapp \
&& awk '/^\s+listOfServers/ {gsub("127.0.0.1:12800", "127.0.0.1:12800,127.0.0.1:12801", $0)} {print}' webapp.yml > clusterized_webapp.yml \ && awk '/^\s+listOfServers:/ {gsub("listOfServers:.*", "listOfServers: 127.0.0.1:12800,127.0.0.1:12801", $0)} {print}' webapp.yml > clusterized_webapp.yml \
&& mv clusterized_webapp.yml webapp.yml && mv clusterized_webapp.yml webapp.yml
cd ${original_wd} cd ${original_wd}
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
echo 'starting OAP server...' \ echo 'starting OAP server...' \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \ && SW_STORAGE_ES_BULK_ACTIONS=1 \
&& SW_STORAGE_ES_FLUSH_INTERVAL=1 \ && SW_STORAGE_ES_FLUSH_INTERVAL=1 \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer1 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer1 \
&& start_oap 'init' && start_oap 'init'
echo 'starting Web app...' \ echo 'starting Web app...' \
...@@ -31,11 +33,9 @@ if test "${MODE}" = "cluster"; then ...@@ -31,11 +33,9 @@ if test "${MODE}" = "cluster"; then
&& SW_CORE_REST_PORT=12801 \ && SW_CORE_REST_PORT=12801 \
&& SW_STORAGE_ES_BULK_ACTIONS=1 \ && SW_STORAGE_ES_BULK_ACTIONS=1 \
&& SW_STORAGE_ES_FLUSH_INTERVAL=1 \ && SW_STORAGE_ES_FLUSH_INTERVAL=1 \
&& SW_RECEIVER_BUFFER_PATH=/tmp/oap/trace_buffer2 \
&& SW_SERVICE_MESH_BUFFER_PATH=/tmp/oap/mesh_buffer2 \
&& start_oap 'no-init' && start_oap 'no-init'
# start another WebApp server in a different port
echo 'starting Web app...' \
&& start_webapp '0.0.0.0' 8082
fi fi
echo 'starting instrumented services...' && start_instrumented_services echo 'starting instrumented services...' && start_instrumented_services
......
...@@ -95,33 +95,21 @@ public class ClusterVerificationITCase { ...@@ -95,33 +95,21 @@ public class ClusterVerificationITCase {
public void verify() throws Exception { public void verify() throws Exception {
LocalDateTime startTime = LocalDateTime.now(ZoneOffset.UTC); LocalDateTime startTime = LocalDateTime.now(ZoneOffset.UTC);
final Map<String, String> user = new HashMap<>(); // minimum guarantee that the instrumented services registered
user.put("name", "SkyWalking"); // which is the prerequisite of following verifications(service instance, service metrics, etc.)
List<Service> services = Collections.emptyList(); List<Service> services = Collections.emptyList();
while (services.size() < 2) { while (services.size() < 2) {
try { try {
restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
services = queryClient.services( services = queryClient.services(
new ServicesQuery() new ServicesQuery()
.start(startTime) .start(startTime)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
); );
Thread.sleep(500); // take a nap to avoid high payload
} catch (Throwable ignored) { } catch (Throwable ignored) {
} }
} }
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
LOGGER.info("responseEntity: {}, {}", responseEntity.getStatusCode(), responseEntity.getBody());
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
verifyTraces(startTime); verifyTraces(startTime);
verifyServices(startTime); verifyServices(startTime);
...@@ -155,7 +143,7 @@ public class ClusterVerificationITCase { ...@@ -155,7 +143,7 @@ public class ClusterVerificationITCase {
services = queryClient.services( services = queryClient.services(
new ServicesQuery() new ServicesQuery()
.start(minutesAgo) .start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC))
); );
Thread.sleep(retryInterval); Thread.sleep(retryInterval);
} }
...@@ -189,14 +177,15 @@ public class ClusterVerificationITCase { ...@@ -189,14 +177,15 @@ public class ClusterVerificationITCase {
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
); );
while (instances == null) { while (instances == null) {
LOGGER.warn("instances is null, will retry to query"); LOGGER.warn("instances is null, will send traffic data and retry to query");
generateTraffic();
Thread.sleep(retryInterval);
instances = queryClient.instances( instances = queryClient.instances(
new InstancesQuery() new InstancesQuery()
.serviceId(service.getKey()) .serviceId(service.getKey())
.start(minutesAgo) .start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
); );
Thread.sleep(retryInterval);
} }
InputStream expectedInputStream = InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.instances.yml").getInputStream(); new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.instances.yml").getInputStream();
...@@ -210,11 +199,12 @@ public class ClusterVerificationITCase { ...@@ -210,11 +199,12 @@ public class ClusterVerificationITCase {
new EndpointQuery().serviceId(service.getKey()) new EndpointQuery().serviceId(service.getKey())
); );
while (endpoints == null) { while (endpoints == null) {
LOGGER.warn("endpoints is null, will retry to query"); LOGGER.warn("endpoints is null, will send traffic data and retry to query");
generateTraffic();
Thread.sleep(retryInterval);
endpoints = queryClient.endpoints( endpoints = queryClient.endpoints(
new EndpointQuery().serviceId(service.getKey()) new EndpointQuery().serviceId(service.getKey())
); );
Thread.sleep(retryInterval);
} }
InputStream expectedInputStream = InputStream expectedInputStream =
new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.endpoints.yml").getInputStream(); new ClassPathResource("expected-data/org.apache.skywalking.e2e.ClusterVerificationITCase.endpoints.yml").getInputStream();
...@@ -228,8 +218,8 @@ public class ClusterVerificationITCase { ...@@ -228,8 +218,8 @@ public class ClusterVerificationITCase {
for (String metricsName : ALL_INSTANCE_METRICS) { for (String metricsName : ALL_INSTANCE_METRICS) {
LOGGER.info("verifying service instance response time: {}", instance); LOGGER.info("verifying service instance response time: {}", instance);
boolean matched = false; boolean valid = false;
while (!matched) { while (!valid) {
LOGGER.warn("instanceRespTime is null, will retry to query"); LOGGER.warn("instanceRespTime is null, will retry to query");
Metrics instanceRespTime = queryClient.metrics( Metrics instanceRespTime = queryClient.metrics(
new MetricsQuery() new MetricsQuery()
...@@ -245,8 +235,9 @@ public class ClusterVerificationITCase { ...@@ -245,8 +235,9 @@ public class ClusterVerificationITCase {
instanceRespTimeMatcher.setValue(greaterThanZero); instanceRespTimeMatcher.setValue(greaterThanZero);
try { try {
instanceRespTimeMatcher.verify(instanceRespTime); instanceRespTimeMatcher.verify(instanceRespTime);
matched = true; valid = true;
} catch (Throwable ignored) { } catch (Throwable ignored) {
generateTraffic();
Thread.sleep(retryInterval); Thread.sleep(retryInterval);
} }
LOGGER.info("{}: {}", metricsName, instanceRespTime); LOGGER.info("{}: {}", metricsName, instanceRespTime);
...@@ -263,15 +254,14 @@ public class ClusterVerificationITCase { ...@@ -263,15 +254,14 @@ public class ClusterVerificationITCase {
for (String metricName : ALL_ENDPOINT_METRICS) { for (String metricName : ALL_ENDPOINT_METRICS) {
LOGGER.info("verifying endpoint {}, metrics: {}", endpoint, metricName); LOGGER.info("verifying endpoint {}, metrics: {}", endpoint, metricName);
boolean matched = false; boolean valid = false;
while (!matched) { while (!valid) {
LOGGER.warn("serviceMetrics is null, will retry to query"); Metrics endpointMetrics = queryClient.metrics(
Metrics metrics = queryClient.metrics(
new MetricsQuery() new MetricsQuery()
.stepByMinute() .stepByMinute()
.metricsName(metricName) .metricsName(metricName)
.start(minutesAgo) .start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC))
.id(endpoint.getKey()) .id(endpoint.getKey())
); );
AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher(); AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
...@@ -279,12 +269,13 @@ public class ClusterVerificationITCase { ...@@ -279,12 +269,13 @@ public class ClusterVerificationITCase {
greaterThanZero.setValue("gt 0"); greaterThanZero.setValue("gt 0");
instanceRespTimeMatcher.setValue(greaterThanZero); instanceRespTimeMatcher.setValue(greaterThanZero);
try { try {
instanceRespTimeMatcher.verify(metrics); instanceRespTimeMatcher.verify(endpointMetrics);
matched = true; valid = true;
} catch (Throwable ignored) { } catch (Throwable ignored) {
generateTraffic();
Thread.sleep(retryInterval); Thread.sleep(retryInterval);
} }
LOGGER.info("metrics: {}", metrics); LOGGER.info("{}: {}", metricName, endpointMetrics);
} }
} }
} }
...@@ -294,14 +285,14 @@ public class ClusterVerificationITCase { ...@@ -294,14 +285,14 @@ public class ClusterVerificationITCase {
for (String metricName : ALL_SERVICE_METRICS) { for (String metricName : ALL_SERVICE_METRICS) {
LOGGER.info("verifying service {}, metrics: {}", service, metricName); LOGGER.info("verifying service {}, metrics: {}", service, metricName);
boolean matched = false; boolean valid = false;
while (!matched) { while (!valid) {
Metrics serviceMetrics = queryClient.metrics( Metrics serviceMetrics = queryClient.metrics(
new MetricsQuery() new MetricsQuery()
.stepByMinute() .stepByMinute()
.metricsName(metricName) .metricsName(metricName)
.start(minutesAgo) .start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1)) .end(LocalDateTime.now(ZoneOffset.UTC))
.id(service.getKey()) .id(service.getKey())
); );
AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher(); AtLeastOneOfMetricsMatcher instanceRespTimeMatcher = new AtLeastOneOfMetricsMatcher();
...@@ -310,33 +301,28 @@ public class ClusterVerificationITCase { ...@@ -310,33 +301,28 @@ public class ClusterVerificationITCase {
instanceRespTimeMatcher.setValue(greaterThanZero); instanceRespTimeMatcher.setValue(greaterThanZero);
try { try {
instanceRespTimeMatcher.verify(serviceMetrics); instanceRespTimeMatcher.verify(serviceMetrics);
matched = true; valid = true;
} catch (Throwable ignored) { } catch (Throwable ignored) {
generateTraffic();
Thread.sleep(retryInterval); Thread.sleep(retryInterval);
} }
LOGGER.info("serviceMetrics: {}", serviceMetrics); LOGGER.info("{}: {}", metricName, serviceMetrics);
} }
} }
} }
private void verifyTraces(LocalDateTime minutesAgo) throws Exception { private void verifyTraces(LocalDateTime minutesAgo) throws Exception {
List<Trace> traces = queryClient.traces( final TracesQuery query = new TracesQuery()
new TracesQuery() .stepBySecond()
.stepBySecond() .start(minutesAgo)
.start(minutesAgo) .orderByStartTime();
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
.orderByStartTime() List<Trace> traces = queryClient.traces(query.end(LocalDateTime.now(ZoneOffset.UTC)));
);
while (traces.isEmpty()) { while (traces.isEmpty()) {
LOGGER.warn("traces is empty, will retry to query"); LOGGER.warn("traces is empty, will generate traffic data and retry");
traces = queryClient.traces( generateTraffic();
new TracesQuery()
.stepBySecond()
.start(minutesAgo)
.end(LocalDateTime.now(ZoneOffset.UTC).plusMinutes(1))
.orderByStartTime()
);
Thread.sleep(retryInterval); Thread.sleep(retryInterval);
traces = queryClient.traces(query.end(LocalDateTime.now(ZoneOffset.UTC)));
} }
InputStream expectedInputStream = InputStream expectedInputStream =
...@@ -345,4 +331,16 @@ public class ClusterVerificationITCase { ...@@ -345,4 +331,16 @@ public class ClusterVerificationITCase {
final TracesMatcher tracesMatcher = yaml.loadAs(expectedInputStream, TracesMatcher.class); final TracesMatcher tracesMatcher = yaml.loadAs(expectedInputStream, TracesMatcher.class);
tracesMatcher.verifyLoosely(traces); tracesMatcher.verifyLoosely(traces);
} }
private void generateTraffic() {
final Map<String, String> user = new HashMap<>();
user.put("name", "SkyWalking");
final ResponseEntity<String> responseEntity = restTemplate.postForEntity(
instrumentedServiceUrl + "/e2e/users",
user,
String.class
);
LOGGER.info("responseEntity: {}, {}", responseEntity.getStatusCode(), responseEntity.getBody());
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册