diff --git a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java index e5dc9f08605450e236d3c0dfff8126c7f49fbfc3..82ae6a514aec390973073fe7dbcce1f87c1ba1b8 100644 --- a/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java +++ b/test/plugin/scenarios/spring-kafka-2.2.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java @@ -18,6 +18,8 @@ package test.org.apache.skywalking.apm.testcase.spring.kafka.controller; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -57,6 +59,8 @@ public class CaseController { private String topicName; private KafkaTemplate kafkaTemplate; + private CountDownLatch latch = new CountDownLatch(1); + @PostConstruct private void setUp() { topicName = "spring_test"; @@ -83,7 +87,6 @@ public class CaseController { props.setMessageListener(new AcknowledgingMessageListener() { @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { - System.out.println(data); OkHttpClient client = new OkHttpClient.Builder().build(); Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build(); Response response = null; @@ -93,6 +96,7 @@ public class CaseController { } response.body().close(); acknowledgment.acknowledge(); + latch.countDown(); } }); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(factory, props); @@ -105,10 +109,14 @@ public class CaseController { public String springKafkaCase() { try { kafkaTemplate.send(topicName, "key", "helloWorld").get(); - Thread.sleep(2000L); + kafkaTemplate.flush(); } catch (Exception e) { e.printStackTrace(); } + try { + latch.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } return SUCCESS; } diff --git a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java index 5b3f98722d782a17c1d1dbbffc54f4dd53eb9ee9..7bed076d71cd4376ca096b9614dc4459ccd339e3 100644 --- a/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java +++ b/test/plugin/scenarios/spring-kafka-2.3.x-scenario/src/main/java/test/org/apache/skywalking/apm/testcase/spring/kafka/controller/CaseController.java @@ -18,6 +18,8 @@ package test.org.apache.skywalking.apm.testcase.spring.kafka.controller; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; @@ -57,6 +59,8 @@ public class CaseController { private String topicName; private KafkaTemplate kafkaTemplate; + private CountDownLatch latch = new CountDownLatch(1); + @PostConstruct private void setUp() { topicName = "spring_test"; @@ -83,7 +87,6 @@ public class CaseController { props.setMessageListener(new AcknowledgingMessageListener() { @Override public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) { - System.out.println(data); OkHttpClient client = new OkHttpClient.Builder().build(); Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build(); Response response = null; @@ -93,6 +96,7 @@ public class CaseController { } response.body().close(); acknowledgment.acknowledge(); + latch.countDown(); } }); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(factory, props); @@ -105,10 +109,14 @@ public class CaseController { public String springKafkaCase() { try { kafkaTemplate.send(topicName, "key", "helloWorld").get(); - Thread.sleep(2000L); + kafkaTemplate.flush(); } catch (Exception e) { e.printStackTrace(); } + try { + latch.await(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } return SUCCESS; }