diff --git a/client/pom.xml b/client/pom.xml index 0a2fe2d4f50e5958e229b1c43b323ccecacf7f97..164082c9b26ac7e6fc6384b275266e3e5820057b 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -47,6 +47,18 @@ org.apache.commons commons-lang3 + + io.opentracing + opentracing-api + 0.33.0 + provided + + + io.opentracing + opentracing-mock + 0.33.0 + test + org.apache.logging.log4j log4j-core diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java index 27622cd30223602143d4d6948e906a6da59daf81..1ad4b6105153d4dd4e167887dc54d3bee451bb56 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java @@ -25,4 +25,20 @@ public class TraceConstants { public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER"; public static final String TRACE_TOPIC_PREFIX = TopicValidator.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_"; + public static final String TO_PREFIX = "To_"; + public static final String FROM_PREFIX = "From_"; + public static final String END_TRANSACTION = "EndTransaction"; + public static final String ROCKETMQ_SERVICE = "rocketmq"; + public static final String ROCKETMQ_SUCCESS = "rocketmq.success"; + public static final String ROCKETMQ_TAGS = "rocketmq.tags"; + public static final String ROCKETMQ_KEYS = "rocketmq.keys"; + public static final String ROCKETMQ_SOTRE_HOST = "rocketmq.store_host"; + public static final String ROCKETMQ_BODY_LENGTH = "rocketmq.body_length"; + public static final String ROCKETMQ_MSG_ID = "rocketmq.mgs_id"; + public static final String ROCKETMQ_MSG_TYPE = "rocketmq.mgs_type"; + public static final String ROCKETMQ_REGION_ID = "rocketmq.region_id"; + public static final String ROCKETMQ_TRANSACTION_ID = "rocketmq.transaction_id"; + public static final String ROCKETMQ_TRANSACTION_STATE = "rocketmq.transaction_state"; + public static final String ROCKETMQ_IS_FROM_TRANSACTION_CHECK = "rocketmq.is_from_transaction_check"; + public static final String ROCKETMQ_RETRY_TIMERS = "rocketmq.retry_times"; } diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..28fccae06f8d358925ff3e539ee0cf1c4c0ed52d --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/ConsumeMessageOpenTracingHookImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.hook; + +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.protocol.NamespaceUtil; + +import java.util.ArrayList; +import java.util.List; + + +public class ConsumeMessageOpenTracingHookImpl implements ConsumeMessageHook { + + private Tracer tracer; + + public ConsumeMessageOpenTracingHookImpl(Tracer tracer) { + this.tracer = tracer; + } + + @Override + public String hookName() { + return "ConsumeMessageOpenTracingHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + List spanList = new ArrayList<>(); + for (MessageExt msg : context.getMsgList()) { + if (msg == null) { + continue; + } + Tracer.SpanBuilder spanBuilder = tracer + .buildSpan(TraceConstants.FROM_PREFIX + msg.getTopic()) + .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_CONSUMER); + SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties())); + if (spanContext != null) { + spanBuilder.asChildOf(spanContext); + } + Span span = spanBuilder.start(); + + span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE); + span.setTag(Tags.MESSAGE_BUS_DESTINATION, NamespaceUtil.withoutNamespace(msg.getTopic())); + span.setTag(TraceConstants.ROCKETMQ_MSG_ID, msg.getMsgId()); + span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags()); + span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys()); + span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getStoreSize()); + span.setTag(TraceConstants.ROCKETMQ_RETRY_TIMERS, msg.getReconsumeTimes()); + span.setTag(TraceConstants.ROCKETMQ_REGION_ID, msg.getProperty(MessageConst.PROPERTY_MSG_REGION)); + spanList.add(span); + } + context.setMqTraceContext(spanList); + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + List spanList = (List) context.getMqTraceContext(); + if (spanList == null) { + return; + } + for (Span span : spanList) { + span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.isSuccess()); + span.finish(); + } + } +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..62d310f1961f0971f11803367d9d83b04a8ee7b0 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/EndTransactionOpenTracingHookImpl.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.hook; + +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.hook.EndTransactionContext; +import org.apache.rocketmq.client.hook.EndTransactionHook; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageType; + +public class EndTransactionOpenTracingHookImpl implements EndTransactionHook { + + private Tracer tracer; + + public EndTransactionOpenTracingHookImpl(Tracer tracer) { + this.tracer = tracer; + } + + @Override + public String hookName() { + return "EndTransactionOpenTracingHook"; + } + + @Override + public void endTransaction(EndTransactionContext context) { + if (context == null) { + return; + } + Message msg = context.getMessage(); + Tracer.SpanBuilder spanBuilder = tracer + .buildSpan(TraceConstants.END_TRANSACTION) + .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER); + SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties())); + if (spanContext != null) { + spanBuilder.asChildOf(spanContext); + } + + Span span = spanBuilder.start(); + span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE); + span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic()); + span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags()); + span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys()); + span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr()); + span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getMsgId()); + span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, MessageType.Trans_msg_Commit.name()); + span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_ID, context.getTransactionId()); + span.setTag(TraceConstants.ROCKETMQ_TRANSACTION_STATE, context.getTransactionState().name()); + span.setTag(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK, context.isFromTransactionCheck()); + span.finish(); + } + +} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..60c18a22a77ad8f265ebfb38ca91c239dce8a01d --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/trace/hook/SendMessageOpenTracingHookImpl.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.trace.hook; + +import io.opentracing.Span; +import io.opentracing.SpanContext; +import io.opentracing.Tracer; +import io.opentracing.propagation.Format; +import io.opentracing.propagation.TextMapAdapter; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.TraceConstants; +import org.apache.rocketmq.common.message.Message; + +public class SendMessageOpenTracingHookImpl implements SendMessageHook { + + private Tracer tracer; + + public SendMessageOpenTracingHookImpl(Tracer tracer) { + this.tracer = tracer; + } + + @Override + public String hookName() { + return "SendMessageOpenTracingHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null) { + return; + } + Message msg = context.getMessage(); + Tracer.SpanBuilder spanBuilder = tracer + .buildSpan(TraceConstants.TO_PREFIX + msg.getTopic()) + .withTag(Tags.SPAN_KIND, Tags.SPAN_KIND_PRODUCER); + SpanContext spanContext = tracer.extract(Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties())); + if (spanContext != null) { + spanBuilder.asChildOf(spanContext); + } + Span span = spanBuilder.start(); + tracer.inject(span.context(), Format.Builtin.TEXT_MAP, new TextMapAdapter(msg.getProperties())); + span.setTag(Tags.PEER_SERVICE, TraceConstants.ROCKETMQ_SERVICE); + span.setTag(Tags.MESSAGE_BUS_DESTINATION, msg.getTopic()); + span.setTag(TraceConstants.ROCKETMQ_TAGS, msg.getTags()); + span.setTag(TraceConstants.ROCKETMQ_KEYS, msg.getKeys()); + span.setTag(TraceConstants.ROCKETMQ_SOTRE_HOST, context.getBrokerAddr()); + span.setTag(TraceConstants.ROCKETMQ_MSG_TYPE, context.getMsgType().name()); + span.setTag(TraceConstants.ROCKETMQ_BODY_LENGTH, msg.getBody().length); + context.setMqTraceContext(span); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMqTraceContext() == null) { + return; + } + if (context.getSendResult() == null) { + return; + } + + if (context.getSendResult().getRegionId() == null) { + return; + } + + Span span = (Span) context.getMqTraceContext(); + span.setTag(TraceConstants.ROCKETMQ_SUCCESS, context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)); + span.setTag(TraceConstants.ROCKETMQ_MSG_ID, context.getSendResult().getMsgId()); + span.setTag(TraceConstants.ROCKETMQ_REGION_ID, context.getSendResult().getRegionId()); + span.finish(); + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..16a3d0275b314b4b3cd833901fafb7a34c03a6ef --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; +import org.apache.rocketmq.client.impl.consumer.PullMessageService; +import org.apache.rocketmq.client.impl.consumer.PullRequest; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(DefaultMQPushConsumerImpl.class) +@PowerMockIgnore("javax.management.*") +public class DefaultMQConsumerWithOpenTracingTest { + private String consumerGroup; + + private String topic = "FooBar"; + private String brokerName = "BrokerA"; + private MQClientInstance mQClientFactory; + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + private PullAPIWrapper pullAPIWrapper; + private RebalancePushImpl rebalancePushImpl; + private DefaultMQPushConsumer pushConsumer; + private MockTracer tracer = new MockTracer(); + + @Before + public void init() throws Exception { + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); + pushConsumer = new DefaultMQPushConsumer(consumerGroup); + pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( + new ConsumeMessageOpenTracingHookImpl(tracer)); + pushConsumer.setNamesrvAddr("127.0.0.1:9876"); + pushConsumer.setPullInterval(60 * 1000); + + pushConsumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + return null; + } + }); + + PowerMockito.suppress(PowerMockito.method(DefaultMQPushConsumerImpl.class, "updateTopicSubscribeInfoWhenSubscriptionChanged")); + DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); + Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + field.set(pushConsumerImpl, rebalancePushImpl); + pushConsumer.subscribe(topic, "*"); + + pushConsumer.start(); + + mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); + + field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pushConsumerImpl, mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); + field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); + field.setAccessible(true); + field.set(pushConsumerImpl, pullAPIWrapper); + + pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); + mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[]{'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); + return pullResult; + } + }); + + doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createPullRequest().getMessageQueue()); + pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); + } + + @After + public void terminate() { + pushConsumer.shutdown(); + } + + @Test + public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + final CountDownLatch countDownLatch = new CountDownLatch(1); + final MessageExt[] messageExts = new MessageExt[1]; + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + messageExts[0] = msgs.get(0); + countDownLatch.countDown(); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + })); + + PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); + pullMessageService.executePullRequestImmediately(createPullRequest()); + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(messageExts[0].getTopic()).isEqualTo(topic); + assertThat(messageExts[0].getBody()).isEqualTo(new byte[]{'a'}); + + assertThat(tracer.finishedSpans().size()).isEqualTo(1); + MockSpan span = tracer.finishedSpans().get(0); + assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); + assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_SUCCESS)).isEqualTo(true); + } + + private PullRequest createPullRequest() { + PullRequest pullRequest = new PullRequest(); + pullRequest.setConsumerGroup(consumerGroup); + pullRequest.setNextOffset(1024); + + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + pullRequest.setMessageQueue(messageQueue); + ProcessQueue processQueue = new ProcessQueue(); + processQueue.setLocked(true); + processQueue.setLastLockTimestamp(System.currentTimeMillis()); + pullRequest.setProcessQueue(processQueue); + + return pullRequest; + } + + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List messageExtList) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (MessageExt messageExt : messageExtList) { + outputStream.write(MessageDecoder.encode(messageExt, false)); + } + return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); + } + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5d64a93f03987f80122a3e468e7af28a7046b374 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMQProducerWithOpenTracingTest { + + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + @Mock + private MQClientAPIImpl mQClientAPIImpl; + + private DefaultMQProducer producer; + + private Message message; + private String topic = "FooBar"; + private String producerGroupPrefix = "FooBar_PID"; + private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private MockTracer tracer = new MockTracer(); + + @Before + public void init() throws Exception { + + producer = new DefaultMQProducer(producerGroupTemp); + producer.getDefaultMQProducerImpl().registerSendMessageHook( + new SendMessageOpenTracingHookImpl(tracer)); + producer.setNamesrvAddr("127.0.0.1:9876"); + message = new Message(topic, new byte[] {'a', 'b', 'c'}); + + producer.start(); + + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); + + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); + + } + + @Test + public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.send(message); + assertThat(tracer.finishedSpans().size()).isEqualTo(1); + MockSpan span = tracer.finishedSpans().get(0); + assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); + assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123"); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_BODY_LENGTH)).isEqualTo(3); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_REGION_ID)).isEqualTo("HZ"); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Normal_Msg.name()); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_SOTRE_HOST)).isEqualTo("127.0.0.1:10911"); + } + + @After + public void terminate() { + producer.shutdown(); + } + + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSysFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId("123"); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + return sendResult; + } + +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dd6d1083ce03853f62eec696fdc256fa53aced7a --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import io.opentracing.mock.MockSpan; +import io.opentracing.mock.MockTracer; +import io.opentracing.tag.Tags; +import org.apache.rocketmq.client.ClientConfig; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.MQClientManager; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl; +import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.message.MessageType; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.common.topic.TopicValidator; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Spy; +import org.mockito.junit.MockitoJUnitRunner; + +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class TransactionMQProducerWithOpenTracingTest { + + @Spy + private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); + @Mock + private MQClientAPIImpl mQClientAPIImpl; + + private TransactionMQProducer producer; + + private Message message; + private String topic = "FooBar"; + private String producerGroupPrefix = "FooBar_PID"; + private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); + private String producerGroupTraceTemp = TopicValidator.RMQ_SYS_TRACE_TOPIC + System.currentTimeMillis(); + private MockTracer tracer = new MockTracer(); + @Before + public void init() throws Exception { + TransactionListener transactionListener = new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }; + producer = new TransactionMQProducer(producerGroupTemp); + producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer)); + producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer)); + producer.setTransactionListener(transactionListener); + + producer.setNamesrvAddr("127.0.0.1:9876"); + message = new Message(topic, new byte[] {'a', 'b', 'c'}); + + producer.start(); + + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); + + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); + when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), + nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) + .thenReturn(createSendResult(SendStatus.SEND_OK)); + + } + + @Test + public void testSendMessageSync_WithTrace_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { + producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, producer.getDefaultMQProducerImpl()); + when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + producer.sendMessageInTransaction(message, null); + + assertThat(tracer.finishedSpans().size()).isEqualTo(2); + MockSpan span = tracer.finishedSpans().get(1); + assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); + assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_PRODUCER); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_ID)).isEqualTo("123"); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_MSG_TYPE)).isEqualTo(MessageType.Trans_msg_Commit.name()); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_TRANSACTION_STATE)).isEqualTo(LocalTransactionState.COMMIT_MESSAGE.name()); + assertThat(span.tags().get(TraceConstants.ROCKETMQ_IS_FROM_TRANSACTION_CHECK)).isEqualTo(false); + } + + @After + public void terminate() { + producer.shutdown(); + } + + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSysFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId(MessageDecoder.createMessageId(new InetSocketAddress("127.0.0.1", 12), 1)); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + sendResult.setMessageQueue(new MessageQueue(topic, "broker-trace", 0)); + return sendResult; + } + +} diff --git a/example/pom.xml b/example/pom.xml index abe0b95619b3002d01f1aed4e8b9c5805b4904dc..568dd802954112270b634fc436d0928db7e2aa14 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -52,5 +52,15 @@ org.javassist javassist + + io.jaegertracing + jaeger-core + 1.6.0 + + + io.jaegertracing + jaeger-client + 1.6.0 + diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..cd9ae2792b667ed6122605775eaaf34055664ca4 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingProducer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.tracemessage; + +import io.jaegertracing.Configuration; +import io.jaegertracing.internal.samplers.ConstSampler; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class OpenTracingProducer { + public static void main(String[] args) throws MQClientException { + + Tracer tracer = initTracer(); + + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); + producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer)); + producer.start(); + + try { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } + + private static Tracer initTracer() { + Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv() + .withType(ConstSampler.TYPE) + .withParam(1); + Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv() + .withLogSpans(true); + + Configuration config = new Configuration("rocketmq") + .withSampler(samplerConfig) + .withReporter(reporterConfig); + GlobalTracer.registerIfAbsent(config.getTracer()); + return config.getTracer(); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..1d5d8a273b2dbc4d7b29a132356b17d4389c734b --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingPushConsumer.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.tracemessage; + +import io.jaegertracing.Configuration; +import io.jaegertracing.internal.samplers.ConstSampler; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.trace.hook.ConsumeMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +import java.util.List; + +public class OpenTracingPushConsumer { + public static void main(String[] args) throws InterruptedException, MQClientException { + Tracer tracer = initTracer(); + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); + consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer)); + + consumer.subscribe("TopicTest", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + + consumer.setConsumeTimestamp("20181109221800"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } + + private static Tracer initTracer() { + Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv() + .withType(ConstSampler.TYPE) + .withParam(1); + Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv() + .withLogSpans(true); + + Configuration config = new Configuration("rocketmq") + .withSampler(samplerConfig) + .withReporter(reporterConfig); + GlobalTracer.registerIfAbsent(config.getTracer()); + return config.getTracer(); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..428640a383b6149093965155ab5638515794acbc --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/OpenTracingTransactionProducer.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.tracemessage; + +import io.jaegertracing.Configuration; +import io.jaegertracing.internal.samplers.ConstSampler; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.client.trace.hook.EndTransactionOpenTracingHookImpl; +import org.apache.rocketmq.client.trace.hook.SendMessageOpenTracingHookImpl; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +import java.io.UnsupportedEncodingException; + +public class OpenTracingTransactionProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + Tracer tracer = initTracer(); + + TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); + producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageOpenTracingHookImpl(tracer)); + producer.getDefaultMQProducerImpl().registerEndTransactionHook(new EndTransactionOpenTracingHookImpl(tracer)); + + producer.setTransactionListener(new TransactionListener() { + @Override + public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + + @Override + public LocalTransactionState checkLocalTransaction(MessageExt msg) { + return LocalTransactionState.COMMIT_MESSAGE; + } + }); + producer.start(); + + try { + Message msg = new Message("TopicTest", "Tag", "KEY", + ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.sendMessageInTransaction(msg, null); + System.out.printf("%s%n", sendResult); + } catch (MQClientException | UnsupportedEncodingException e) { + e.printStackTrace(); + } + + for (int i = 0; i < 100000; i++) { + Thread.sleep(1000); + } + producer.shutdown(); + } + + private static Tracer initTracer() { + Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv() + .withType(ConstSampler.TYPE) + .withParam(1); + Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv() + .withLogSpans(true); + + Configuration config = new Configuration("rocketmq") + .withSampler(samplerConfig) + .withReporter(reporterConfig); + GlobalTracer.registerIfAbsent(config.getTracer()); + return config.getTracer(); + } +}