From f2a5a747fbf248776340f4a99118d421de2a5310 Mon Sep 17 00:00:00 2001
From: yuz10 <845238369@qq.com>
Date: Mon, 10 May 2021 16:21:34 +0800
Subject: [PATCH] Support OpenTracing(#2861)
---
client/pom.xml | 12 +
.../rocketmq/client/trace/TraceConstants.java | 16 ++
.../ConsumeMessageOpenTracingHookImpl.java | 95 ++++++++
.../EndTransactionOpenTracingHookImpl.java | 72 ++++++
.../hook/SendMessageOpenTracingHookImpl.java | 88 +++++++
.../DefaultMQConsumerWithOpenTracingTest.java | 230 ++++++++++++++++++
.../DefaultMQProducerWithOpenTracingTest.java | 170 +++++++++++++
...nsactionMQProducerWithOpenTracingTest.java | 189 ++++++++++++++
example/pom.xml | 10 +
.../tracemessage/OpenTracingProducer.java | 68 ++++++
.../tracemessage/OpenTracingPushConsumer.java | 71 ++++++
.../OpenTracingTransactionProducer.java | 86 +++++++
12 files changed, 1107 insertions(+)
create mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
create mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java
create mode 100644 client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
create mode 100644 client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
create mode 100644 client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java
create mode 100644 client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java
create mode 100644 example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
create mode 100644 example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
create mode 100644 example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
diff --git a/client/pom.xml b/client/pom.xml
index 0a2fe2d4..164082c9 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -47,6 +47,18 @@
org.apache.commons
commons-lang3
+
+ io.opentracing
+ opentracing-api
+ 0.33.0
+ provided
+
+
+ io.opentracing
+ opentracing-mock
+ 0.33.0
+ test
+
org.apache.logging.log4j
log4j-core
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index 27622cd3..1ad4b610 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -25,4 +25,20 @@ public class TraceConstants {
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
+ public static final String TO_PREFIX = "To_";
+ public static final String FROM_PREFIX = "From_";
+ public static final String END_TRANSACTION = "EndTransaction";
+ public static final String ROCKETMQ_SERVICE = "rocketmq";
+ public static final String ROCKETMQ_SUCCESS = "rocketmq.success";
+ public static final String ROCKETMQ_TAGS = "rocketmq.tags";
+ public static final String ROCKETMQ_KEYS = "rocketmq.keys";
+ public static final String ROCKETMQ_SOTRE_HOST = "rocketmq.store_host";
+ public static final String ROCKETMQ_BODY_LENGTH = "rocketmq.body_length";
+ public static final String ROCKETMQ_MSG_ID = "rocketmq.mgs_id";
+ public static final String ROCKETMQ_MSG_TYPE = "rocketmq.mgs_type";
+ public static final String ROCKETMQ_REGION_ID = "rocketmq.region_id";
+ public static final String ROCKETMQ_TRANSACTION_ID = "rocketmq.transaction_id";
+ public static final String ROCKETMQ_TRANSACTION_STATE = "rocketmq.transaction_state";
+ public static final String ROCKETMQ_IS_FROM_TRANSACTION_CHECK = "rocketmq.is_from_transaction_check";
+ public static final String ROCKETMQ_RETRY_TIMERS = "rocketmq.retry_times";
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
new file mode 100644
index 00000000..28fccae0
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class ConsumeMessageOpenTracingHookImpl implements ConsumeMessageHook {
+
+ private Tracer tracer;
+
+ public ConsumeMessageOpenTracingHookImpl(Tracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public String hookName() {
+ return "ConsumeMessageOpenTracingHook";
+ }
+
+ @Override
+ public void consumeMessageBefore(ConsumeMessageContext context) {
+ if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+ return;
+ }
+ List spanList = new ArrayList<>();
+ for (MessageExt msg : context.getMsgList()) {
+ if (msg == null) {
+ continue;
+ }
+ Tracer.SpanBuilder spanBuilder = tracer
+ .buildSpan(TraceConstants.FROM_PREFIX + msg.getTopic())
+ .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER);
+ SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+ if (spanContext != null) {
+ spanBuilder.asChildOf(spanContext);
+ }
+ Span span = spanBuilder.start();
+
+ span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+ span.setTag(Tags.MESSAGE_BUS_DESTINATION, NamespaceUtil.withoutNamespace(msg.getTopic()));
+ span.setTag(TraceConstants.ROCKETMQ_MSG_ID, msg.getMsgId());
+ span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+ span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+ span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getStoreSize());
+ span.setTag(TraceConstants.ROCKETMQ_RETRY_TIMERS, msg.getReconsumeTimes());
+ span.setTag(TraceConstants.ROCKETMQ_REGION_ID, msg.getProperty(MessageConst.PROPERTY_MSG_REGION));
+ spanList.add(span);
+ }
+ context.setMqTraceContext(spanList);
+ }
+
+ @Override
+ public void consumeMessageAfter(ConsumeMessageContext context) {
+ if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
+ return;
+ }
+ List spanList = (List) context.getMqTraceContext();
+ if (spanList == null) {
+ return;
+ }
+ for (Span span : spanList) {
+ span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.isSuccess());
+ span.finish();
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java
new file mode 100644
index 00000000..62d310f1
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.EndTransactionContext;
+import org.apache.rocketmq.client.hook.EndTransactionHook;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageType;
+
+public class EndTransactionOpenTracingHookImpl implements EndTransactionHook {
+
+ private Tracer tracer;
+
+ public EndTransactionOpenTracingHookImpl(Tracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public String hookName() {
+ return "EndTransactionOpenTracingHook";
+ }
+
+ @Override
+ public void endTransaction(EndTransactionContext context) {
+ if (context == null) {
+ return;
+ }
+ Message msg = context.getMessage();
+ Tracer.SpanBuilder spanBuilder = tracer
+ .buildSpan(TraceConstants.END_TRANSACTION)
+ .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+ SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+ if (spanContext != null) {
+ spanBuilder.asChildOf(spanContext);
+ }
+
+ Span span = spanBuilder.start();
+ span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+ span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
+ span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+ span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+ span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
+ span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getMsgId());
+ span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, MessageType.Trans_msg_Commit.name());
+ span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_ID, context.getTransactionId());
+ span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_STATE, context.getTransactionState().name());
+ span.setTag(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK, context.isFromTransactionCheck());
+ span.finish();
+ }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
new file mode 100644
index 00000000..60c18a22
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.trace.hook;
+
+import io.opentracing.Span;
+import io.opentracing.SpanContext;
+import io.opentracing.Tracer;
+import io.opentracing.propagation.Format;
+import io.opentracing.propagation.TextMapAdapter;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.client.trace.TraceConstants;
+import org.apache.rocketmq.common.message.Message;
+
+public class SendMessageOpenTracingHookImpl implements SendMessageHook {
+
+ private Tracer tracer;
+
+ public SendMessageOpenTracingHookImpl(Tracer tracer) {
+ this.tracer = tracer;
+ }
+
+ @Override
+ public String hookName() {
+ return "SendMessageOpenTracingHook";
+ }
+
+ @Override
+ public void sendMessageBefore(SendMessageContext context) {
+ if (context == null) {
+ return;
+ }
+ Message msg = context.getMessage();
+ Tracer.SpanBuilder spanBuilder = tracer
+ .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic())
+ .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER);
+ SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+ if (spanContext != null) {
+ spanBuilder.asChildOf(spanContext);
+ }
+ Span span = spanBuilder.start();
+ tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties()));
+ span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE);
+ span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic());
+ span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags());
+ span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys());
+ span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr());
+ span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, context.getMsgType().name());
+ span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length);
+ context.setMqTraceContext(span);
+ }
+
+ @Override
+ public void sendMessageAfter(SendMessageContext context) {
+ if (context == null || context.getMqTraceContext() == null) {
+ return;
+ }
+ if (context.getSendResult() == null) {
+ return;
+ }
+
+ if (context.getSendResult().getRegionId() == null) {
+ return;
+ }
+
+ Span span = (Span) context.getMqTraceContext();
+ span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK));
+ span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getSendResult().getMsgId());
+ span.setTag(TraceConstants.ROCKETMQ_REGION_ID, context.getSendResult().getRegionId());
+ span.finish();
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
new file mode 100644
index 00000000..16a3d027
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.trace;
+
+import io.opentracing.mock.MockSpan;
+import io.opentracing.mock.MockTracer;
+import io.opentracing.tag.Tags;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.PullCallback;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
+import org.apache.rocketmq.client.impl.consumer.PullMessageService;
+import org.apache.rocketmq.client.impl.consumer.PullRequest;
+import org.apache.rocketmq.client.impl.consumer.PullResultExt;
+import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.MessageClientExt;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DefaultMQPushConsumerImpl.class)
+@PowerMockIgnore("javax.management.*")
+public class DefaultMQConsumerWithOpenTracingTest {
+ private String consumerGroup;
+
+ private String topic = "FooBar";
+ private String brokerName = "BrokerA";
+ private MQClientInstance mQClientFactory;
+
+ @Mock
+ private MQClientAPIImpl mQClientAPIImpl;
+ private PullAPIWrapper pullAPIWrapper;
+ private RebalancePushImpl rebalancePushImpl;
+ private DefaultMQPushConsumer pushConsumer;
+ private MockTracer tracer = new MockTracer();
+
+ @Before
+ public void init() throws Exception {
+ consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+ pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+ pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
+ new ConsumeMessageOpenTracingHookImpl(tracer));
+ pushConsumer.setNamesrvAddr("127.0.0.1:9876");
+ pushConsumer.setPullInterval(60 * 1000);
+
+ pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ return null;
+ }
+ });
+
+ PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged"));
+ DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
+ rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
+ Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, rebalancePushImpl);
+ pushConsumer.subscribe(topic, "*");
+
+ pushConsumer.start();
+
+ mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
+
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, mQClientFactory);
+
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mQClientFactory, mQClientAPIImpl);
+
+ pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+ field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
+ field.setAccessible(true);
+ field.set(pushConsumerImpl, pullAPIWrapper);
+
+ pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
+ mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
+
+ when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer
+
+ io.jaegertracing
+ jaeger-core
+ 1.6.0
+
+
+ io.jaegertracing
+ jaeger-client
+ 1.6.0
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
new file mode 100644
index 00000000..cd9ae279
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class OpenTracingProducer {
+ public static void main(String[] args) throws MQClientException {
+
+ Tracer tracer = initTracer();
+
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+ producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
+ producer.start();
+
+ try {
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.send(msg);
+ System.out.printf("%s%n", sendResult);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+
+ private static Tracer initTracer() {
+ Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
+ Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+ .withLogSpans(true);
+
+ Configuration config = new Configuration("rocketmq")
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
+ GlobalTracer.registerIfAbsent(config.getTracer());
+ return config.getTracer();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
new file mode 100644
index 00000000..1d5d8a27
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class OpenTracingPushConsumer {
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ Tracer tracer = initTracer();
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
+ consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
+
+ consumer.subscribe("TopicTest", "*");
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+
+ consumer.setConsumeTimestamp("20181109221800");
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+
+ private static Tracer initTracer() {
+ Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
+ Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+ .withLogSpans(true);
+
+ Configuration config = new Configuration("rocketmq")
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
+ GlobalTracer.registerIfAbsent(config.getTracer());
+ return config.getTracer();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
new file mode 100644
index 00000000..428640a3
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.tracemessage;
+
+import io.jaegertracing.Configuration;
+import io.jaegertracing.internal.samplers.ConstSampler;
+import io.opentracing.Tracer;
+import io.opentracing.util.GlobalTracer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.LocalTransactionState;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.TransactionListener;
+import org.apache.rocketmq.client.producer.TransactionMQProducer;
+import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl;
+import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.UnsupportedEncodingException;
+
+public class OpenTracingTransactionProducer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ Tracer tracer = initTracer();
+
+ TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
+ producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer));
+ producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer));
+
+ producer.setTransactionListener(new TransactionListener() {
+ @Override
+ public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+
+ @Override
+ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
+ return LocalTransactionState.COMMIT_MESSAGE;
+ }
+ });
+ producer.start();
+
+ try {
+ Message msg = new Message("TopicTest", "Tag", "KEY",
+ ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
+ SendResult sendResult = producer.sendMessageInTransaction(msg, null);
+ System.out.printf("%s%n", sendResult);
+ } catch (MQClientException | UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+
+ for (int i = 0; i < 100000; i++) {
+ Thread.sleep(1000);
+ }
+ producer.shutdown();
+ }
+
+ private static Tracer initTracer() {
+ Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv()
+ .withType(ConstSampler.TYPE)
+ .withParam(1);
+ Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv()
+ .withLogSpans(true);
+
+ Configuration config = new Configuration("rocketmq")
+ .withSampler(samplerConfig)
+ .withReporter(reporterConfig);
+ GlobalTracer.registerIfAbsent(config.getTracer());
+ return config.getTracer();
+ }
+}
--
GitLab