未验证 提交 f2a5a747 编写于 作者: Y yuz10 提交者: GitHub

Support OpenTracing(#2861)

上级 fb0d0193
......@@ -47,6 +47,18 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>0.33.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-mock</artifactId>
<version>0.33.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
......
......@@ -25,4 +25,20 @@ public class TraceConstants {
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
public static final String TO_PREFIX = "To_";
public static final String FROM_PREFIX = "From_";
public static final String END_TRANSACTION = "EndTransaction";
public static final String ROCKETMQ_SERVICE = "rocketmq";
public static final String ROCKETMQ_SUCCESS = "rocketmq.success";
public static final String ROCKETMQ_TAGS = "rocketmq.tags";
public static final String ROCKETMQ_KEYS = "rocketmq.keys";
public static final String ROCKETMQ_SOTRE_HOST = "rocketmq.store_host";
public static final String ROCKETMQ_BODY_LENGTH = "rocketmq.body_length";
public static final String ROCKETMQ_MSG_ID = "rocketmq.mgs_id";
public static final String ROCKETMQ_MSG_TYPE = "rocketmq.mgs_type";
public static final String ROCKETMQ_REGION_ID = "rocketmq.region_id";
public static final String ROCKETMQ_TRANSACTION_ID = "rocketmq.transaction_id";
public static final String ROCKETMQ_TRANSACTION_STATE = "rocketmq.transaction_state";
public static final String ROCKETMQ_IS_FROM_TRANSACTION_CHECK = "rocketmq.is_from_transaction_check";
public static final String ROCKETMQ_RETRY_TIMERS = "rocketmq.retry_times";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import java.util.ArrayList;
import java.util.List;
public class ConsumeMessageOpenTracingHookImpl implements ConsumeMessageHook {
private Tracer tracer;
public ConsumeMessageOpenTracingHookImpl(Tracer tracer) {
this.tracer = tracer;
}
@Override
public String hookName() {
return "ConsumeMessageOpenTracingHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
List<Span> spanList = new ArrayList<>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
Tracer.SpanBuilder spanBuilder = tracer
.buildSpan(TraceConstants.FROM_PREFIX + msg.getTopic())
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER);
SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
}
Span span = spanBuilder.start();
span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
span.setTag(Tags.MESSAGE_BUS_DESTINATION, NamespaceUtil.withoutNamespace(msg.getTopic()));
span.setTag(TraceConstants.ROCKETMQ_MSG_ID, msg.getMsgId());
span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getStoreSize());
span.setTag(TraceConstants.ROCKETMQ_RETRY_TIMERS, msg.getReconsumeTimes());
span.setTag(TraceConstants.ROCKETMQ_REGION_ID, msg.getProperty(MessageConst.PROPERTY_MSG_REGION));
spanList.add(span);
}
context.setMqTraceContext(spanList);
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
List<Span> spanList = (List<Span>) context.getMqTraceContext();
if (spanList == null) {
return;
}
for (Span span : spanList) {
span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.isSuccess());
span.finish();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.hook.EndTransactionContext;
import org.apache.rocketmq.client.hook.EndTransactionHook;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageType;
public class EndTransactionOpenTracingHookImpl implements EndTransactionHook {
private Tracer tracer;
public EndTransactionOpenTracingHookImpl(Tracer tracer) {
this.tracer = tracer;
}
@Override
public String hookName() {
return "EndTransactionOpenTracingHook";
}
@Override
public void endTransaction(EndTransactionContext context) {
if (context == null) {
return;
}
Message msg = context.getMessage();
Tracer.SpanBuilder spanBuilder = tracer
.buildSpan(TraceConstants.END_TRANSACTION)
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
}
Span span = spanBuilder.start();
span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getMsgId());
span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, MessageType.Trans_msg_Commit.name());
span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_ID, context.getTransactionId());
span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_STATE, context.getTransactionState().name());
span.setTag(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK, context.isFromTransactionCheck());
span.finish();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace.hook;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.propagation.TextMapAdapter;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.TraceConstants;
import org.apache.rocketmq.common.message.Message;
public class SendMessageOpenTracingHookImpl implements SendMessageHook {
private Tracer tracer;
public SendMessageOpenTracingHookImpl(Tracer tracer) {
this.tracer = tracer;
}
@Override
public String hookName() {
return "SendMessageOpenTracingHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
Message msg = context.getMessage();
Tracer.SpanBuilder spanBuilder = tracer
.buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
.withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
if (spanContext != null) {
spanBuilder.asChildOf(spanContext);
}
Span span = spanBuilder.start();
tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, context.getMsgType().name());
span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length);
context.setMqTraceContext(span);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null) {
return;
}
if (context.getSendResult() == null) {
return;
}
if (context.getSendResult().getRegionId() == null) {
return;
}
Span span = (Span) context.getMqTraceContext();
span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK));
span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getSendResult().getMsgId());
span.setTag(TraceConstants.ROCKETMQ_REGION_ID, context.getSendResult().getRegionId());
span.finish();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest(DefaultMQPushConsumerImpl.class)
@PowerMockIgnore("javax.management.*")
public class DefaultMQConsumerWithOpenTracingTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
private MockTracer tracer = new MockTracer();
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
new ConsumeMessageOpenTracingHookImpl(tracer));
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return null;
}
});
PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@After
public void terminate() {
pushConsumer.shutdown();
}
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}));
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'});
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
assertThat(span.tags().get(TraceConstants.ROCKETMQ_SUCCESS)).isEqualTo(true);
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class DefaultMQProducerWithOpenTracingTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private DefaultMQProducer producer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private MockTracer tracer = new MockTracer();
@Before
public void init() throws Exception {
producer = new DefaultMQProducer(producerGroupTemp);
producer.getDefaultMQProducerImpl().registerSendMessageHook(
new SendMessageOpenTracingHookImpl(tracer));
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.send(message);
assertThat(tracer.finishedSpans().size()).isEqualTo(1);
MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER);
assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123");
assertThat(span.tags().get(TraceConstants.ROCKETMQ_BODY_LENGTH)).isEqualTo(3);
assertThat(span.tags().get(TraceConstants.ROCKETMQ_REGION_ID)).isEqualTo("HZ");
assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Normal_Msg.name());
assertThat(span.tags().get(TraceConstants.ROCKETMQ_SOTRE_HOST)).isEqualTo("127.0.0.1:10911");
}
@After
public void terminate() {
producer.shutdown();
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId("123");
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
return sendResult;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.trace;
import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class TransactionMQProducerWithOpenTracingTest {
@Spy
private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private TransactionMQProducer producer;
private Message message;
private String topic = "FooBar";
private String producerGroupPrefix = "FooBar_PID";
private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis();
private MockTracer tracer = new MockTracer();
@Before
public void init() throws Exception {
TransactionListener transactionListener = new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
};
producer = new TransactionMQProducer(producerGroupTemp);
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
message = new Message(topic, new byte[] {'a', 'b', 'c'});
producer.start();
Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
}
@Test
public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
producer.sendMessageInTransaction(message, null);
assertThat(tracer.finishedSpans().size()).isEqualTo(2);
MockSpan span = tracer.finishedSpans().get(1);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER);
assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123");
assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Trans_msg_Commit.name());
assertThat(span.tags().get(TraceConstants.ROCKETMQ_TRANSACTION_STATE)).isEqualTo(LocalTransactionState.COMMIT_MESSAGE.name());
assertThat(span.tags().get(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK)).isEqualTo(false);
}
@After
public void terminate() {
producer.shutdown();
}
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("BrokerA");
brokerData.setCluster("DefaultCluster");
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(0L, "127.0.0.1:10911");
brokerData.setBrokerAddrs(brokerAddrs);
brokerDataList.add(brokerData);
topicRouteData.setBrokerDatas(brokerDataList);
List<QueueData> queueDataList = new ArrayList<QueueData>();
QueueData queueData = new QueueData();
queueData.setBrokerName("BrokerA");
queueData.setPerm(6);
queueData.setReadQueueNums(3);
queueData.setWriteQueueNums(4);
queueData.setTopicSysFlag(0);
queueDataList.add(queueData);
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
private SendResult createSendResult(SendStatus sendStatus) {
SendResult sendResult = new SendResult();
sendResult.setMsgId("123");
sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1));
sendResult.setQueueOffset(456);
sendResult.setSendStatus(sendStatus);
sendResult.setRegionId("HZ");
sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0));
return sendResult;
}
}
......@@ -52,5 +52,15 @@
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.tracemessage;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.samplers.ConstSampler;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OpenTracingProducer {
public static void main(String[] args) throws MQClientException {
Tracer tracer = initTracer();
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
producer.start();
try {
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);
Configuration config = new Configuration("rocketmq")
.withSampler(samplerConfig)
.withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.tracemessage;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.samplers.ConstSampler;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class OpenTracingPushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
Tracer tracer = initTracer();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);
Configuration config = new Configuration("rocketmq")
.withSampler(samplerConfig)
.withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.tracemessage;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.samplers.ConstSampler;
import io.opentracing.Tracer;
import io.opentracing.util.GlobalTracer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
public class OpenTracingTransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
Tracer tracer = initTracer();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
try {
Message msg = new Message("TopicTest", "Tag", "KEY",
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
private static Tracer initTracer() {
Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);
Configuration config = new Configuration("rocketmq")
.withSampler(samplerConfig)
.withReporter(reporterConfig);
GlobalTracer.registerIfAbsent(config.getTracer());
return config.getTracer();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册