未验证 提交 ed71efda 编写于 作者: 于玉桔 提交者: GitHub

Fix spring-kafka test unstable (#5313)

上级 a6e45d04
......@@ -18,8 +18,6 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
......@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Controller
@RequestMapping("/case")
......@@ -60,6 +59,7 @@ public class CaseController {
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
......@@ -74,6 +74,12 @@ public class CaseController {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
try {
kafkaTemplate.send(topicName, "key", "ping").get();
kafkaTemplate.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private void setUpConsumer() {
......@@ -87,16 +93,18 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
if (data.value().equals(helloWorld)) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.2.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
acknowledgment.acknowledge();
latch.countDown();
}
response.body().close();
acknowledgment.acknowledge();
latch.countDown();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
......@@ -106,17 +114,10 @@ public class CaseController {
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() {
try {
kafkaTemplate.send(topicName, "key", "helloWorld").get();
kafkaTemplate.flush();
} catch (Exception e) {
e.printStackTrace();
}
try {
latch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
public String springKafkaCase() throws Exception {
kafkaTemplate.send(topicName, "key", helloWorld).get();
latch.await();
kafkaTemplate.flush();
return SUCCESS;
}
......
......@@ -18,8 +18,6 @@
package test.org.apache.skywalking.apm.testcase.spring.kafka.controller;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
......@@ -46,6 +44,7 @@ import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@Controller
@RequestMapping("/case")
......@@ -60,6 +59,7 @@ public class CaseController {
private KafkaTemplate<String, String> kafkaTemplate;
private CountDownLatch latch = new CountDownLatch(1);
private String helloWorld = "helloWorld";
@PostConstruct
private void setUp() {
......@@ -74,6 +74,12 @@ public class CaseController {
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
try {
kafkaTemplate.send(topicName, "key", "ping").get();
kafkaTemplate.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
private void setUpConsumer() {
......@@ -87,16 +93,18 @@ public class CaseController {
props.setMessageListener(new AcknowledgingMessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
if (data.value().equals(helloWorld)) {
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder().url("http://localhost:8080/spring-kafka-2.3.x-scenario/case/spring-kafka-consumer-ping").build();
Response response = null;
try {
response = client.newCall(request).execute();
} catch (IOException e) {
}
response.body().close();
acknowledgment.acknowledge();
latch.countDown();
}
response.body().close();
acknowledgment.acknowledge();
latch.countDown();
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
......@@ -106,17 +114,10 @@ public class CaseController {
@RequestMapping("/spring-kafka-case")
@ResponseBody
public String springKafkaCase() {
try {
kafkaTemplate.send(topicName, "key", "helloWorld").get();
kafkaTemplate.flush();
} catch (Exception e) {
e.printStackTrace();
}
try {
latch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
public String springKafkaCase() throws Exception {
kafkaTemplate.send(topicName, "key", helloWorld).get();
latch.await();
kafkaTemplate.flush();
return SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册