From 4b7a90a32ba4153d7cd6996b260c59895e2a7731 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Sat, 11 Sep 2021 12:39:19 +0800 Subject: [PATCH] [#3326] fix send trace fail if useTLS=true (#3325) --- .../client/consumer/DefaultLitePullConsumer.java | 4 +++- .../client/consumer/DefaultMQPushConsumer.java | 10 +++++++++- .../rocketmq/client/producer/DefaultMQProducer.java | 8 ++++++++ .../client/trace/DefaultMQConsumerWithTraceTest.java | 9 +++++++++ .../DefaultMQLitePullConsumerWithTraceTest.java | 12 +++++++++++- .../client/trace/DefaultMQProducerWithTraceTest.java | 10 ++++++++++ 6 files changed, 50 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index c54399aa..74d6f345 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon private void setTraceDispatcher() { if (isEnableMsgTrace()) { try { - this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS()); + this.traceDispatcher = traceDispatcher; this.defaultLitePullConsumerImpl.registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index caf166de..58cf1346 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, withNamespace(newTopic), queueNum, 0); } - + + @Override + public void setUseTLS(boolean useTLS) { + super.setUseTLS(useTLS); + if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS); + } + } + /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. */ diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 1c4a9315..0705935e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } } + @Override + public void setUseTLS(boolean useTLS) { + super.setUseTLS(useTLS); + if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS); + } + } + /** * Start this producer instance.

* diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index aec7d2cb..976380b2 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest { assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } + + @Test + public void testPushConsumerWithTraceTLS() { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true); + consumer.setUseTLS(true); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } private PullRequest createPullRequest() { PullRequest pullRequest = new PullRequest(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index 67ae194b..ce3b832b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest { } } + @Test + public void testLitePullConsumerWithTraceTLS() throws Exception { + DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup"); + consumer.setUseTLS(true); + consumer.setEnableMsgTrace(true); + consumer.start(); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); @@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest { doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); } -} \ No newline at end of file +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 62b34175..234e32e6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest { } + + @Test + public void testProducerWithTraceTLS() { + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true); + producer.setUseTLS(true); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } + @After public void terminate() { producer.shutdown(); -- GitLab