未验证 提交 4b7a90a3 编写于 作者: Y yuz10 提交者: GitHub

[#3326] fix send trace fail if useTLS=true (#3325)

上级 b2aa85d3
...@@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon ...@@ -535,7 +535,9 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
private void setTraceDispatcher() { private void setTraceDispatcher() {
if (isEnableMsgTrace()) { if (isEnableMsgTrace()) {
try { try {
this.traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null); AsyncTraceDispatcher traceDispatcher = new AsyncTraceDispatcher(consumerGroup, TraceDispatcher.Type.CONSUME, customizedTraceTopic, null);
traceDispatcher.getTraceProducer().setUseTLS(this.isUseTLS());
this.traceDispatcher = traceDispatcher;
this.defaultLitePullConsumerImpl.registerConsumeMessageHook( this.defaultLitePullConsumerImpl.registerConsumeMessageHook(
new ConsumeMessageTraceHookImpl(traceDispatcher)); new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) { } catch (Throwable e) {
......
...@@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume ...@@ -412,7 +412,15 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException { public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0); createTopic(key, withNamespace(newTopic), queueNum, 0);
} }
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}
/** /**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method. * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/ */
......
...@@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -263,6 +263,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
} }
} }
@Override
public void setUseTLS(boolean useTLS) {
super.setUseTLS(useTLS);
if (traceDispatcher != null && traceDispatcher instanceof AsyncTraceDispatcher) {
((AsyncTraceDispatcher) traceDispatcher).getTraceProducer().setUseTLS(useTLS);
}
}
/** /**
* Start this producer instance. </p> * Start this producer instance. </p>
* *
......
...@@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator; ...@@ -69,6 +69,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest { ...@@ -235,6 +236,14 @@ public class DefaultMQConsumerWithTraceTest {
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[] {'a'}); assertThat(msg.getBody()).isEqualTo(new byte[] {'a'});
} }
@Test
public void testPushConsumerWithTraceTLS() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true);
consumer.setUseTLS(true);
AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
}
private PullRequest createPullRequest() { private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest(); PullRequest pullRequest = new PullRequest();
......
...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData; ...@@ -52,6 +52,7 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest { ...@@ -146,6 +147,15 @@ public class DefaultMQLitePullConsumerWithTraceTest {
} }
} }
@Test
public void testLitePullConsumerWithTraceTLS() throws Exception {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("consumerGroup");
consumer.setUseTLS(true);
consumer.setEnableMsgTrace(true);
consumer.start();
AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) consumer.getTraceDispatcher();
Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
}
private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception { private DefaultLitePullConsumer createLitePullConsumerWithDefaultTraceTopic() throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis()); DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer(consumerGroup + System.currentTimeMillis());
...@@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest { ...@@ -302,4 +312,4 @@ public class DefaultMQLitePullConsumerWithTraceTest {
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
} }
} }
\ No newline at end of file
...@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; ...@@ -39,6 +39,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
...@@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest { ...@@ -143,6 +144,15 @@ public class DefaultMQProducerWithTraceTest {
} }
@Test
public void testProducerWithTraceTLS() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroupTemp, true);
producer.setUseTLS(true);
AsyncTraceDispatcher asyncTraceDispatcher = (AsyncTraceDispatcher) producer.getTraceDispatcher();
Assert.assertTrue(asyncTraceDispatcher.getTraceProducer().isUseTLS());
}
@After @After
public void terminate() { public void terminate() {
producer.shutdown(); producer.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册