diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index c54399aa5ed24346fc801e4d9b15050ee7a2b997..74d6f3455f3494e6feed2db66433d62b24735b5d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon private void setTraceDispatcher() { if (isEnableMsgTrace()) { try { - this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); + traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS()); + this.traceDispatcher = traceDispatcher; this.defaultLitePullConsumerImpl.registerConsumeMessageHook( new ConsumeMessageTraceHookImpl(traceDispatcher)); } catch (Throwable e) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index caf166de47d5e78d88321bdb57516219ff3efa24..58cf1346ffa6d67d1fc1bf74135440be3115ac3b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { createTopic(key, withNamespace(newTopic), queueNum, 0); } - + + @Override + public void setUseTLS(boolean useTLS) { + super.setUseTLS(useTLS); + if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS); + } + } + /** * This method will be removed in a certain version after April 5, 2020, so please do not use this method. */ diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 1c4a9315a8cd95c6c2aa0b1fd62c25091fa4213c..0705935efacc8544349ec3c78088d62ea7e89f78 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { } } + @Override + public void setUseTLS(boolean useTLS) { + super.setUseTLS(useTLS); + if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) { + ((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS); + } + } + /** * Start this producer instance.

* diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index aec7d2cb0e28e57884bb468b41d831ac23ece99b..976380b27234e862cd66b9d29c4058fecc0ef6f7 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest { assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); } + + @Test + public void testPushConsumerWithTraceTLS() { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true); + consumer.setUseTLS(true); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } private PullRequest createPullRequest() { PullRequest pullRequest = new PullRequest(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java index 67ae194b880ce5a497b7d0aaac72566276fb1ec5..ce3b832b006f905a72848c5b34e95d9f67be035e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQLitePullConsumerWithTraceTest.java @@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.RPCHook; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest { } } + @Test + public void testLitePullConsumerWithTraceTLS() throws Exception { + DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup"); + consumer.setUseTLS(true); + consumer.setEnableMsgTrace(true); + consumer.start(); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); @@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest { doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); } -} \ No newline at end of file +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 62b34175aa2dbf5fad30076a16ad2b1696addca2..234e32e6807e67fedea7e3177b9dac72cdd2d0bf 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest { } + + @Test + public void testProducerWithTraceTLS() { + DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true); + producer.setUseTLS(true); + AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher(); + Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS()); + } + @After public void terminate() { producer.shutdown();