diff --git a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java
index 8d411d8476acdc0e3a98b4a97dd6fcff29d22ffb..20476f96d1412b332b1c0373f832a597c13b5b3a 100644
--- a/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java
+++ b/apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java
@@ -72,6 +72,9 @@ public class ComponentsDefine {
public static final OfficialComponent GRPC = new OfficialComponent(23, "GRPC");
public static final OfficialComponent ELASTIC_JOB = new OfficialComponent(24, "ElasticJob");
+
+ public static final OfficialComponent ROCKET_MQ = new OfficialComponent(25, "RocketMQ");
+
private static ComponentsDefine instance = new ComponentsDefine();
@@ -82,7 +85,7 @@ public class ComponentsDefine {
}
public ComponentsDefine() {
- components = new String[25];
+ components = new String[26];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
@@ -107,6 +110,7 @@ public class ComponentsDefine {
addComponent(POSTGRESQL);
addComponent(GRPC);
addComponent(ELASTIC_JOB);
+ addComponent(ROCKET_MQ);
}
private void addComponent(OfficialComponent component) {
diff --git a/apm-sniffer/apm-sdk-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/pom.xml
index edfd655fbf6a88101bf3df95296f43e6ea7e7747..56db2cbec3024cbd80094f900883551024cae1ff 100644
--- a/apm-sniffer/apm-sdk-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/pom.xml
@@ -52,6 +52,7 @@
h2-1.x-plugin
postgresql-8.x-plugin
oracle-10.x-plugin
+ rocketMQ-4.x-plugin
elastic-job-2.x-plugin
mongodb-2.x-plugin
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..76977bd2f3e42279425edbbe92e0ace322221b64
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+
+
+ apm-sdk-plugin
+ org.skywalking
+ 3.3.0-2017
+
+ 4.0.0
+
+ apm-rocketmq-4.x-plugin
+ rocketMQ-4.x-plugin
+
+
+ UTF-8
+
+
+
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.1.0-incubating
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+
+
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..b428759a852e3f2f07e445fad8fcd7aefcc3349f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.skywalking.apm.agent.core.context.CarrierItem;
+import org.skywalking.apm.agent.core.context.ContextCarrier;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+
+/**
+ * {@link AbstractMessageConsumeInterceptor} create entry span when the consumeMessage
in the {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
+ *
+ * @author zhangxin
+ */
+public abstract class AbstractMessageConsumeInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String COMSUMER_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public final void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ List msgs = (List)allArguments[0];
+
+ ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
+ AbstractSpan span = ContextManager.createEntrySpan(COMSUMER_OPERATION_NAME_PREFIX + msgs.get(0).getTopic() + "/Consumer", contextCarrier);
+
+ span.setComponent(ComponentsDefine.ROCKET_MQ);
+ span.setLayer(SpanLayer.MQ);
+ for (int i = 1; i < msgs.size(); i++) {
+ ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
+ }
+
+ }
+
+ @Override public final void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ private ContextCarrier getContextCarrierFromMessage(MessageExt message) {
+ ContextCarrier contextCarrier = new ContextCarrier();
+
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ next.setHeadValue(message.getUserProperty(next.getHeadKey()));
+ }
+
+ return contextCarrier;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..2e786e6517a408ee5c623d26141617fdaad1da30
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageConcurrentlyConsumeInterceptor.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.tag.Tags;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute.
+ *
+ * @author zhang xin
+ */
+public class MessageConcurrentlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ConsumeConcurrentlyStatus status = (ConsumeConcurrentlyStatus)ret;
+ if (status == ConsumeConcurrentlyStatus.RECONSUME_LATER) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+}
+
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..151bc8788895ce7c49b96a987fab00bbdbc49e4a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageOrderlyConsumeInterceptor.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.tag.Tags;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+
+/**
+ * {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link
+ * org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute.
+ *
+ * @author zhang xin
+ */
+public class MessageOrderlyConsumeInterceptor extends AbstractMessageConsumeInterceptor {
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+
+ ConsumeOrderlyStatus status = (ConsumeOrderlyStatus)ret;
+ if (status == ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT) {
+ AbstractSpan activeSpan = ContextManager.activeSpan();
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, status.name());
+ }
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..d8f9eae70d1f516f61b6442fdf18c1c3184e3927
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.skywalking.apm.agent.core.context.CarrierItem;
+import org.skywalking.apm.agent.core.context.ContextCarrier;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo;
+import org.skywalking.apm.util.StringUtil;
+
+import static org.apache.rocketmq.common.message.MessageDecoder.NAME_VALUE_SEPARATOR;
+import static org.apache.rocketmq.common.message.MessageDecoder.PROPERTY_SEPARATOR;
+
+/**
+ * {@link MessageSendInterceptor} create exit span when the method {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long,
+ * org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback,
+ * org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance,
+ * int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * execute.
+ *
+ * @author zhang xin
+ */
+public class MessageSendInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String ASYNC_SEND_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ Message message = (Message)allArguments[2];
+ ContextCarrier contextCarrier = new ContextCarrier();
+ String namingServiceAddress = String.valueOf(objInst.getSkyWalkingDynamicField());
+ AbstractSpan span = ContextManager.createExitSpan(buildOperationName(message.getTopic()), contextCarrier, namingServiceAddress);
+ span.setComponent(ComponentsDefine.ROCKET_MQ);
+ span.setLayer(SpanLayer.MQ);
+ span.tag("brokerName", (String)allArguments[1]);
+ span.tag("tags", message.getTags());
+ span.tag("communication.mode", ((CommunicationMode)allArguments[5]).name());
+
+ SendMessageRequestHeader requestHeader = (SendMessageRequestHeader)allArguments[3];
+ StringBuilder properties = new StringBuilder(requestHeader.getProperties());
+ CarrierItem next = contextCarrier.items();
+ while (next.hasNext()) {
+ next = next.next();
+ if (!StringUtil.isEmpty(next.getHeadValue())) {
+ properties.append(next.getHeadKey());
+ properties.append(NAME_VALUE_SEPARATOR);
+ properties.append(next.getHeadValue());
+ properties.append(PROPERTY_SEPARATOR);
+ }
+ }
+ requestHeader.setProperties(properties.toString());
+
+ if (allArguments[6] != null) {
+ ((EnhancedInstance)allArguments[6]).setSkyWalkingDynamicField(new SendCallBackEnhanceInfo(message.getTopic(), ContextManager.capture()));
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ private String buildOperationName(String topicName) {
+ return ASYNC_SEND_OPERATION_NAME_PREFIX + topicName + "/Producer";
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..067303538b32582cecbfdaf27f5d81b34cd6af1e
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnExceptionInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)}
+ * execute.
+ *
+ * @author zhang xin
+ */
+public class OnExceptionInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+ AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+ activeSpan.errorOccurred().log((Throwable)allArguments[0]);
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().log(t);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..cf927d46be0446cd2108e8058f3c5aefcc9c824a
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.skywalking.apm.agent.core.context.ContextManager;
+import org.skywalking.apm.agent.core.context.tag.Tags;
+import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo;
+
+/**
+ * {@link OnSuccessInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
+ * execute.
+ *
+ * @author zhang xin
+ */
+public class OnSuccessInterceptor implements InstanceMethodsAroundInterceptor {
+
+ public static final String CALLBACK_OPERATION_NAME_PREFIX = "RocketMQ/";
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ SendCallBackEnhanceInfo enhanceInfo = (SendCallBackEnhanceInfo)objInst.getSkyWalkingDynamicField();
+ AbstractSpan activeSpan = ContextManager.createLocalSpan(CALLBACK_OPERATION_NAME_PREFIX + enhanceInfo.getTopicId() + "/Producer/Callback");
+ activeSpan.setComponent(ComponentsDefine.ROCKET_MQ);
+ SendStatus sendStatus = ((SendResult)allArguments[0]).getSendStatus();
+ if (sendStatus != SendStatus.SEND_OK) {
+ activeSpan.errorOccurred();
+ Tags.STATUS_CODE.set(activeSpan, sendStatus.name());
+ }
+ ContextManager.continued(enhanceInfo.getContextSnapshot());
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/UpdateNameServerInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/UpdateNameServerInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..59d755e5e183fbe03ae6d4a6022b68f735700fac
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/UpdateNameServerInterceptor.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.lang.reflect.Method;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+
+public class UpdateNameServerInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+ objInst.setSkyWalkingDynamicField(allArguments[0]);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
+ Object ret) throws Throwable {
+ return ret;
+ }
+
+ @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class>[] argumentsTypes, Throwable t) {
+
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
new file mode 100644
index 0000000000000000000000000000000000000000..18214fe3ed5281611385af1dcbbfda0b987bc387
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link
+ * org.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author zhang xin
+ */
+public class ConsumeMessageConcurrentlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently";
+ private static final String CONSUMER_MESSAGE_METHOD = "consumeMessage";
+ private static final String INTERCEPTOR_CLASS = "org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor";
+
+ @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(CONSUMER_MESSAGE_METHOD);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
new file mode 100644
index 0000000000000000000000000000000000000000..968259ab60c8891a6eb9e1ff51d1887403e47eb2
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
+ * org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link
+ * org.skywalking.apm.plugin.rocketMQ.v4.MessageConcurrentlyConsumeInterceptor}.
+ *
+ * @author zhang xin
+ */
+public class ConsumeMessageOrderlyInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly";
+ private static final String ENHANCE_METHOD = "consumeMessage";
+ private static final String INTERCEPTOR_CLASS = "org.apache.rocketmq.common.message.MessageOrderlyConsumeInterceptor";
+
+ @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(ENHANCE_METHOD);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
new file mode 100644
index 0000000000000000000000000000000000000000..5cf5d8a9ff39dc5eb7e0ce90abf8daa834dd0d3b
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
+
+/**
+ * {@link MQClientAPIImplInstrumentation} intercepts the {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
+ * String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader,
+ * long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback,
+ * org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance,
+ * int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
+ * method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor}.
+ *
+ * @author zhang xin
+ */
+public class MQClientAPIImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.impl.MQClientAPIImpl";
+ private static final String SEND_MESSAGE_METHOD_NAME = "sendMessage";
+ private static final String ASYNC_METHOD_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor";
+ public static final String UPDATE_NAME_SERVER_INTERCEPT_CLASS = "org.skywalking.apm.plugin.rocketMQ.v4.UpdateNameServerInterceptor";
+ public static final String UPDATE_NAME_SERVER_METHOD_NAME = "updateNameServerAddressList";
+
+ @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(SEND_MESSAGE_METHOD_NAME).and(takesArguments(12));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ASYNC_METHOD_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(UPDATE_NAME_SERVER_METHOD_NAME);
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return UPDATE_NAME_SERVER_INTERCEPT_CLASS;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byName(ENHANCE_CLASS);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java
new file mode 100644
index 0000000000000000000000000000000000000000..c5fb2172861a64e132df78baf83136245456911c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import org.skywalking.apm.agent.core.context.ContextSnapshot;
+
+/**
+ * {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace.
+ *
+ * @author zhang xin
+ */
+public class SendCallBackEnhanceInfo {
+ private String topicId;
+ private ContextSnapshot contextSnapshot;
+
+ public SendCallBackEnhanceInfo(String topicId, ContextSnapshot contextSnapshot) {
+ this.topicId = topicId;
+ this.contextSnapshot = contextSnapshot;
+ }
+
+ public String getTopicId() {
+ return topicId;
+ }
+
+ public ContextSnapshot getContextSnapshot() {
+ return contextSnapshot;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
new file mode 100644
index 0000000000000000000000000000000000000000..d63c0a3d7193bb078ec1b8fc905e3784aaaf2d46
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4.define;
+
+import net.bytebuddy.description.method.MethodDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
+import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
+import static org.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
+
+/**
+ * {@link SendCallbackInstrumentation} intercepts {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
+ * method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor} and also intercepts {@link
+ * org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link
+ * org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor}.
+ *
+ * @author zhang xin
+ */
+public class SendCallbackInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+
+ private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.producer.SendCallback";
+ private static final String ON_SUCCESS_ENHANCE_METHOD = "onSuccess";
+ private static final String ON_SUCCESS_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor";
+ private static final String ON_EXCEPTION_METHOD = "onException";
+ private static final String ON_EXCEPTION_INTERCEPTOR = "org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor";
+
+ @Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[0];
+ }
+
+ @Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[] {
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(ON_SUCCESS_ENHANCE_METHOD).and(takesArgumentWithType(0, "org.apache.rocketmq.client.producer.SendResult"));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ON_SUCCESS_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher getMethodsMatcher() {
+ return named(ON_EXCEPTION_METHOD).and(takesArgumentWithType(0, "java.lang.Throwable"));
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return ON_EXCEPTION_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ }
+ };
+ }
+
+ @Override protected ClassMatch enhanceClass() {
+ return byHierarchyMatch(new String[] {ENHANCE_CLASS});
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
new file mode 100644
index 0000000000000000000000000000000000000000..1accc2bff023ea3bfafbbb6460459ec06fceb2af
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
@@ -0,0 +1,4 @@
+rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageConcurrentlyInstrumentation
+rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation
+rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation
+rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..d4d7837126016eec9da9ebc40a36785d1bf33603
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.util.List;
+import org.apache.rocketmq.client.impl.CommunicationMode;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.skywalking.apm.agent.test.tools.SpanAssert;
+import org.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class MessageSendInterceptorTest {
+
+ private MessageSendInterceptor messageSendInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ private Object[] arguments;
+
+ private Object[] argumentsWithoutCallback;
+
+ @Mock
+ private Message message;
+
+ @Mock
+ private SendMessageRequestHeader messageRequestHeader;
+
+ @Mock
+ private EnhancedInstance callBack;
+
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ messageSendInterceptor = new MessageSendInterceptor();
+ enhancedInstance = new EnhancedInstance() {
+ @Override public Object getSkyWalkingDynamicField() {
+ return "127.0.0.1:6543";
+ }
+
+ @Override public void setSkyWalkingDynamicField(Object value) {
+
+ }
+ };
+
+ arguments = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, callBack};
+ argumentsWithoutCallback = new Object[] {"127.0.0.1", "test", message, messageRequestHeader, null, CommunicationMode.ASYNC, null};
+ when(messageRequestHeader.getProperties()).thenReturn("");
+ when(message.getTags()).thenReturn("TagA");
+ }
+
+ @Test
+ public void testSendMessage() throws Throwable {
+ messageSendInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
+ messageSendInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan mqSpan = spans.get(0);
+
+ SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+ SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertTag(mqSpan, 0, "test");
+ SpanAssert.assertTag(mqSpan, 1, "TagA");
+ verify(messageRequestHeader, times(1)).setProperties(anyString());
+ verify(callBack, times(1)).setSkyWalkingDynamicField(Matchers.any());
+ }
+
+ @Test
+ public void testSendMessageWithoutCallBack() throws Throwable {
+ messageSendInterceptor.beforeMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);
+ messageSendInterceptor.afterMethod(enhancedInstance, null, argumentsWithoutCallback, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan mqSpan = spans.get(0);
+
+ SpanAssert.assertLayer(mqSpan, SpanLayer.MQ);
+ SpanAssert.assertComponent(mqSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertTag(mqSpan, 0, "test");
+ SpanAssert.assertTag(mqSpan, 1, "TagA");
+ verify(messageRequestHeader, times(1)).setProperties(anyString());
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..bc749493ac496f8441c624ee8c34e2655846c7a3
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.util.List;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.skywalking.apm.agent.test.helper.SpanHelper;
+import org.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.skywalking.apm.agent.test.tools.SpanAssert;
+import org.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnExceptionInterceptorTest {
+
+ private OnExceptionInterceptor exceptionInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private ContextSnapshot contextSnapshot;
+ private SendCallBackEnhanceInfo enhanceInfo;
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ exceptionInterceptor = new OnExceptionInterceptor();
+
+ enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+ when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+ }
+
+ @Test
+ public void testOnException() throws Throwable {
+ exceptionInterceptor.beforeMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null);
+ exceptionInterceptor.afterMethod(enhancedInstance, null, new Object[] {new RuntimeException()}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan exceptionSpan = spans.get(0);
+ SpanAssert.assertException(SpanHelper.getLogs(exceptionSpan).get(0), RuntimeException.class);
+ SpanAssert.assertOccurException(exceptionSpan, true);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..5fcbb0cf2bbc53c05ee1ca6541723f2ae6bd9ba1
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2017, OpenSkywalking Organization All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Project repository: https://github.com/OpenSkywalking/skywalking
+ */
+
+package org.skywalking.apm.plugin.rocketMQ.v4;
+
+import java.util.List;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+import org.skywalking.apm.agent.core.context.ContextSnapshot;
+import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.skywalking.apm.agent.test.tools.SpanAssert;
+import org.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class OnSuccessInterceptorTest {
+
+ private OnSuccessInterceptor successInterceptor;
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private ContextSnapshot contextSnapshot;
+ @Mock
+ private SendResult sendResult;
+
+ private SendCallBackEnhanceInfo enhanceInfo;
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Before
+ public void setUp() {
+ successInterceptor = new OnSuccessInterceptor();
+
+ enhanceInfo = new SendCallBackEnhanceInfo("test", contextSnapshot);
+ when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+ when(sendResult.getSendStatus()).thenReturn(SendStatus.SEND_OK);
+ }
+
+ @Test
+ public void testOnSuccess() throws Throwable {
+ successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
+ successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan successSpan = spans.get(0);
+
+ SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+
+ }
+
+ @Test
+ public void testOnSuccessWithErrorStatus() throws Throwable {
+ when(sendResult.getSendStatus()).thenReturn(SendStatus.FLUSH_SLAVE_TIMEOUT);
+ successInterceptor.beforeMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
+ successInterceptor.afterMethod(enhancedInstance, null, new Object[] {sendResult}, null, null);
+
+ assertThat(segmentStorage.getTraceSegments().size(), is(1));
+ TraceSegment traceSegment = segmentStorage.getTraceSegments().get(0);
+ List spans = SegmentHelper.getSpans(traceSegment);
+ assertThat(spans.size(), is(1));
+
+ AbstractTracingSpan successSpan = spans.get(0);
+
+ SpanAssert.assertComponent(successSpan, ComponentsDefine.ROCKET_MQ);
+ SpanAssert.assertOccurException(successSpan, true);
+
+ }
+
+}