未验证 提交 b588b5f6 编写于 作者: D Daming 提交者: GitHub

Fix spring-kafka test unstable (#5310)

* fix spring-kafka test unstable

* fix spring-kafka test unstable
上级 522e0951
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller; package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
...@@ -57,6 +59,8 @@ public class CaseController { ...@@ -57,6 +59,8 @@ public class CaseController {
private String topicName; private String topicName;
private KafkaTemplate<String, String> kafkaTemplate; private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
@PostConstruct @PostConstruct
private void setUp() { private void setUp() {
topicName = "spring_test"; topicName = "spring_test";
...@@ -83,7 +87,6 @@ public class CaseController { ...@@ -83,7 +87,6 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() { props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override @Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) { public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
System.out.println(data);
OkHttpClient client = new OkHttpClient.Builder().build(); OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build(); Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null; Response response = null;
...@@ -93,6 +96,7 @@ public class CaseController { ...@@ -93,6 +96,7 @@ public class CaseController {
} }
response.body().close(); response.body().close();
acknowledgment.acknowledge(); acknowledgment.acknowledge();
latch.countDown();
} }
}); });
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props); KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
...@@ -105,10 +109,14 @@ public class CaseController { ...@@ -105,10 +109,14 @@ public class CaseController {
public String springKafkaCase() { public String springKafkaCase() {
try { try {
kafkaTemplate.send(topicName, "key", "helloWorld").get(); kafkaTemplate.send(topicName, "key", "helloWorld").get();
Thread.sleep(2000L); kafkaTemplate.flush();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
try {
latch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
return SUCCESS; return SUCCESS;
} }
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller; package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient; import okhttp3.OkHttpClient;
import okhttp3.Request; import okhttp3.Request;
import okhttp3.Response; import okhttp3.Response;
...@@ -57,6 +59,8 @@ public class CaseController { ...@@ -57,6 +59,8 @@ public class CaseController {
private String topicName; private String topicName;
private KafkaTemplate<String, String> kafkaTemplate; private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
@PostConstruct @PostConstruct
private void setUp() { private void setUp() {
topicName = "spring_test"; topicName = "spring_test";
...@@ -83,7 +87,6 @@ public class CaseController { ...@@ -83,7 +87,6 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() { props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override @Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) { public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
System.out.println(data);
OkHttpClient client = new OkHttpClient.Builder().build(); OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build(); Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null; Response response = null;
...@@ -93,6 +96,7 @@ public class CaseController { ...@@ -93,6 +96,7 @@ public class CaseController {
} }
response.body().close(); response.body().close();
acknowledgment.acknowledge(); acknowledgment.acknowledge();
latch.countDown();
} }
}); });
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props); KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
...@@ -105,10 +109,14 @@ public class CaseController { ...@@ -105,10 +109,14 @@ public class CaseController {
public String springKafkaCase() { public String springKafkaCase() {
try { try {
kafkaTemplate.send(topicName, "key", "helloWorld").get(); kafkaTemplate.send(topicName, "key", "helloWorld").get();
Thread.sleep(2000L); kafkaTemplate.flush();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
try {
latch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
return SUCCESS; return SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册