Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
阿信在这里
SkyWalking
提交
30dbdc82
S
SkyWalking
项目概览
阿信在这里
/
SkyWalking
与 Fork 源项目一致
Fork自
山不在高_有仙则灵 / SkyWalking
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
30dbdc82
编写于
11月 05, 2017
作者:
A
ascrutae
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support rocket mq plugin
上级
a1276c86
变更
18
显示空白变更内容
内联
并排
Showing
18 changed file
with
1150 addition
and
1 deletion
+1150
-1
apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java
...walking/apm/network/trace/component/ComponentsDefine.java
+4
-1
apm-sniffer/apm-sdk-plugin/pom.xml
apm-sniffer/apm-sdk-plugin/pom.xml
+1
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml
+68
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java
...tmq/common/message/AbstractMessageConsumeInterceptor.java
+77
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java
...common/message/MessageConcurrentlyConsumeInterceptor.java
+50
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java
...etmq/common/message/MessageOrderlyConsumeInterceptor.java
+51
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
...alking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
+100
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java
...alking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java
+61
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java
...ywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java
+68
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
.../v4/define/ConsumeMessageConcurrentlyInstrumentation.java
+68
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
...ketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
+68
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
...in/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
+73
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java
...pm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java
+44
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
...lugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
+87
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
...ketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
+4
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
...ng/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
+126
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java
...ng/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java
+87
-0
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java
...king/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java
+113
-0
未找到文件。
apm-network/src/main/java/org/skywalking/apm/network/trace/component/ComponentsDefine.java
浏览文件 @
30dbdc82
...
...
@@ -71,6 +71,8 @@ public class ComponentsDefine {
public
static
final
OfficialComponent
GRPC
=
new
OfficialComponent
(
23
,
"GRPC"
);
public
static
final
OfficialComponent
ROCKET_MQ
=
new
OfficialComponent
(
24
,
"RocketMQ"
);
private
static
ComponentsDefine
instance
=
new
ComponentsDefine
();
private
String
[]
components
;
...
...
@@ -80,7 +82,7 @@ public class ComponentsDefine {
}
public
ComponentsDefine
()
{
components
=
new
String
[
2
4
];
components
=
new
String
[
2
5
];
addComponent
(
TOMCAT
);
addComponent
(
HTTPCLIENT
);
addComponent
(
DUBBO
);
...
...
@@ -104,6 +106,7 @@ public class ComponentsDefine {
addComponent
(
SHARDING_JDBC
);
addComponent
(
POSTGRESQL
);
addComponent
(
GRPC
);
addComponent
(
ROCKET_MQ
);
}
private
void
addComponent
(
OfficialComponent
component
)
{
...
...
apm-sniffer/apm-sdk-plugin/pom.xml
浏览文件 @
30dbdc82
...
...
@@ -52,6 +52,7 @@
<module>
h2-1.x-plugin
</module>
<module>
postgresql-8.x-plugin
</module>
<module>
oracle-10.x-plugin
</module>
<module>
rocketMQ-4.x-plugin
</module>
</modules>
<packaging>
pom
</packaging>
...
...
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/pom.xml
0 → 100644
浏览文件 @
30dbdc82
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<artifactId>
apm-sdk-plugin
</artifactId>
<groupId>
org.skywalking
</groupId>
<version>
3.2.3-2017
</version>
</parent>
<modelVersion>
4.0.0
</modelVersion>
<artifactId>
apm-rocketmq-4.x-plugin
</artifactId>
<name>
rocketMQ-4.x-plugin
</name>
<properties>
<project.build.sourceEncoding>
UTF-8
</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-client
</artifactId>
<version>
4.1.0-incubating
</version>
<scope>
provided
</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-deploy-plugin
</artifactId>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-source-plugin
</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>
attach-sources
</id>
<goals>
<goal>
jar
</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/AbstractMessageConsumeInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.apache.rocketmq.common.message
;
import
java.lang.reflect.Method
;
import
java.util.List
;
import
org.skywalking.apm.agent.core.context.CarrierItem
;
import
org.skywalking.apm.agent.core.context.ContextCarrier
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
/**
* {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
* org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently} and {@link
* org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly} class.
*
* @author zhangxin
*/
public
abstract
class
AbstractMessageConsumeInterceptor
implements
InstanceMethodsAroundInterceptor
{
public
static
final
String
COMSUMER_OPERATION_NAME_PREFIX
=
"RocketMQ/"
;
@Override
public
final
void
beforeMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
MethodInterceptResult
result
)
throws
Throwable
{
List
<
MessageExt
>
msgs
=
(
List
<
MessageExt
>)
allArguments
[
0
];
ContextCarrier
contextCarrier
=
getContextCarrierFromMessage
(
msgs
.
get
(
0
));
AbstractSpan
span
=
ContextManager
.
createEntrySpan
(
COMSUMER_OPERATION_NAME_PREFIX
+
msgs
.
get
(
0
).
getTopic
()
+
"/Consumer"
,
contextCarrier
);
span
.
setComponent
(
ComponentsDefine
.
ROCKET_MQ
);
span
.
setLayer
(
SpanLayer
.
MQ
);
for
(
int
i
=
1
;
i
<
msgs
.
size
();
i
++)
{
ContextManager
.
extract
(
getContextCarrierFromMessage
(
msgs
.
get
(
i
)));
}
}
@Override
public
final
void
handleMethodException
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Throwable
t
)
{
ContextManager
.
activeSpan
().
errorOccurred
().
log
(
t
);
}
private
ContextCarrier
getContextCarrierFromMessage
(
MessageExt
message
)
{
ContextCarrier
contextCarrier
=
new
ContextCarrier
();
CarrierItem
next
=
contextCarrier
.
items
();
while
(
next
.
hasNext
())
{
next
=
next
.
next
();
next
.
setHeadValue
(
message
.
getUserProperty
(
next
.
getHeadKey
()));
}
return
contextCarrier
;
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageConcurrentlyConsumeInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.apache.rocketmq.common.message
;
import
java.lang.reflect.Method
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.tag.Tags
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
/**
* {@link MessageConcurrentlyConsumeInterceptor} set the process status after the {@link
* org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
* org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method execute.
*
* @author zhang xin
*/
public
class
MessageConcurrentlyConsumeInterceptor
extends
AbstractMessageConsumeInterceptor
{
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
ConsumeConcurrentlyStatus
status
=
(
ConsumeConcurrentlyStatus
)
ret
;
if
(
status
==
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
)
{
AbstractSpan
activeSpan
=
ContextManager
.
activeSpan
();
activeSpan
.
errorOccurred
();
Tags
.
STATUS_CODE
.
set
(
activeSpan
,
status
.
name
());
}
ContextManager
.
stopSpan
();
return
ret
;
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/rocketmq/common/message/MessageOrderlyConsumeInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.apache.rocketmq.common.message
;
import
java.lang.reflect.Method
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.tag.Tags
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
/**
* {@link MessageOrderlyConsumeInterceptor} set the process status after the {@link
* org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
* org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method execute.
*
* @author zhang xin
*/
public
class
MessageOrderlyConsumeInterceptor
extends
AbstractMessageConsumeInterceptor
{
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
ConsumeOrderlyStatus
status
=
(
ConsumeOrderlyStatus
)
ret
;
if
(
status
==
ConsumeOrderlyStatus
.
SUSPEND_CURRENT_QUEUE_A_MOMENT
)
{
AbstractSpan
activeSpan
=
ContextManager
.
activeSpan
();
activeSpan
.
errorOccurred
();
Tags
.
STATUS_CODE
.
set
(
activeSpan
,
status
.
name
());
}
ContextManager
.
stopSpan
();
return
ret
;
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.lang.reflect.Method
;
import
org.apache.rocketmq.client.impl.CommunicationMode
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.skywalking.apm.agent.core.context.CarrierItem
;
import
org.skywalking.apm.agent.core.context.ContextCarrier
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo
;
import
org.skywalking.apm.util.StringUtil
;
import
static
org
.
apache
.
rocketmq
.
common
.
message
.
MessageDecoder
.
NAME_VALUE_SEPARATOR
;
import
static
org
.
apache
.
rocketmq
.
common
.
message
.
MessageDecoder
.
PROPERTY_SEPARATOR
;
/**
* {@link MessageSendInterceptor} create exit span when the method {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
* String, Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader, long,
* org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback,
* org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance,
* int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
* execute.
*
* @author zhang xin
*/
public
class
MessageSendInterceptor
implements
InstanceMethodsAroundInterceptor
{
public
static
final
String
ASYNC_SEND_OPERATION_NAME_PREFIX
=
"RocketMQ/"
;
@Override
public
void
beforeMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
MethodInterceptResult
result
)
throws
Throwable
{
Message
message
=
(
Message
)
allArguments
[
2
];
ContextCarrier
contextCarrier
=
new
ContextCarrier
();
AbstractSpan
span
=
ContextManager
.
createExitSpan
(
buildOperationName
(
message
.
getTopic
()),
contextCarrier
,
(
String
)
allArguments
[
0
]);
span
.
setComponent
(
ComponentsDefine
.
ROCKET_MQ
);
span
.
setLayer
(
SpanLayer
.
MQ
);
span
.
tag
(
"brokerName"
,
(
String
)
allArguments
[
1
]);
span
.
tag
(
"tags"
,
message
.
getTags
());
span
.
tag
(
"communication.mode"
,
((
CommunicationMode
)
allArguments
[
5
]).
name
());
SendMessageRequestHeader
requestHeader
=
(
SendMessageRequestHeader
)
allArguments
[
3
];
StringBuilder
properties
=
new
StringBuilder
(
requestHeader
.
getProperties
());
CarrierItem
next
=
contextCarrier
.
items
();
while
(
next
.
hasNext
())
{
next
=
next
.
next
();
if
(!
StringUtil
.
isEmpty
(
next
.
getHeadValue
()))
{
properties
.
append
(
next
.
getHeadKey
());
properties
.
append
(
NAME_VALUE_SEPARATOR
);
properties
.
append
(
next
.
getHeadValue
());
properties
.
append
(
PROPERTY_SEPARATOR
);
}
}
requestHeader
.
setProperties
(
properties
.
toString
());
if
(
allArguments
[
6
]
!=
null
)
{
((
EnhancedInstance
)
allArguments
[
6
]).
setSkyWalkingDynamicField
(
new
SendCallBackEnhanceInfo
(
message
.
getTopic
(),
ContextManager
.
capture
()));
}
}
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
ContextManager
.
stopSpan
();
return
ret
;
}
@Override
public
void
handleMethodException
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Throwable
t
)
{
ContextManager
.
activeSpan
().
errorOccurred
().
log
(
t
);
}
private
String
buildOperationName
(
String
topicName
)
{
return
ASYNC_SEND_OPERATION_NAME_PREFIX
+
topicName
+
"/Producer"
;
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.lang.reflect.Method
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo
;
/**
* {@link OnExceptionInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)}
* execute.
*
* @author zhang xin
*/
public
class
OnExceptionInterceptor
implements
InstanceMethodsAroundInterceptor
{
public
static
final
String
CALLBACK_OPERATION_NAME_PREFIX
=
"RocketMQ/"
;
@Override
public
void
beforeMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
MethodInterceptResult
result
)
throws
Throwable
{
SendCallBackEnhanceInfo
enhanceInfo
=
(
SendCallBackEnhanceInfo
)
objInst
.
getSkyWalkingDynamicField
();
AbstractSpan
activeSpan
=
ContextManager
.
createLocalSpan
(
CALLBACK_OPERATION_NAME_PREFIX
+
enhanceInfo
.
getTopicId
()
+
"/Producer/Callback"
);
activeSpan
.
setComponent
(
ComponentsDefine
.
ROCKET_MQ
);
activeSpan
.
errorOccurred
().
log
((
Throwable
)
allArguments
[
0
]);
ContextManager
.
continued
(
enhanceInfo
.
getContextSnapshot
());
}
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
ContextManager
.
stopSpan
();
return
ret
;
}
@Override
public
void
handleMethodException
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Throwable
t
)
{
ContextManager
.
activeSpan
().
log
(
t
);
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptor.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.lang.reflect.Method
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.skywalking.apm.agent.core.context.ContextManager
;
import
org.skywalking.apm.agent.core.context.tag.Tags
;
import
org.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo
;
/**
* {@link OnSuccessInterceptor} create local span when the method {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
* execute.
*
* @author zhang xin
*/
public
class
OnSuccessInterceptor
implements
InstanceMethodsAroundInterceptor
{
public
static
final
String
CALLBACK_OPERATION_NAME_PREFIX
=
"RocketMQ/"
;
@Override
public
void
beforeMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
MethodInterceptResult
result
)
throws
Throwable
{
SendCallBackEnhanceInfo
enhanceInfo
=
(
SendCallBackEnhanceInfo
)
objInst
.
getSkyWalkingDynamicField
();
AbstractSpan
activeSpan
=
ContextManager
.
createLocalSpan
(
CALLBACK_OPERATION_NAME_PREFIX
+
enhanceInfo
.
getTopicId
()
+
"/Producer/Callback"
);
activeSpan
.
setComponent
(
ComponentsDefine
.
ROCKET_MQ
);
SendStatus
sendStatus
=
((
SendResult
)
allArguments
[
0
]).
getSendStatus
();
if
(
sendStatus
!=
SendStatus
.
SEND_OK
)
{
activeSpan
.
errorOccurred
();
Tags
.
STATUS_CODE
.
set
(
activeSpan
,
sendStatus
.
name
());
}
ContextManager
.
continued
(
enhanceInfo
.
getContextSnapshot
());
}
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
ContextManager
.
stopSpan
();
return
ret
;
}
@Override
public
void
handleMethodException
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Throwable
t
)
{
ContextManager
.
activeSpan
().
errorOccurred
().
log
(
t
);
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageConcurrentlyInstrumentation.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
named
;
import
static
org
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
HierarchyMatch
.
byHierarchyMatch
;
/**
* {@link ConsumeMessageConcurrentlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently#consumeMessage(java.util.List,
* org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext)} method by using {@link
* org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor}.
*
* @author zhang xin
*/
public
class
ConsumeMessageConcurrentlyInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
private
static
final
String
ENHANCE_CLASS
=
"org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently"
;
private
static
final
String
CONSUMER_MESSAGE_METHOD
=
"consumeMessage"
;
private
static
final
String
INTERCEPTOR_CLASS
=
"org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor"
;
@Override
protected
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[
0
];
}
@Override
protected
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[]
{
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
CONSUMER_MESSAGE_METHOD
);
}
@Override
public
String
getMethodsInterceptor
()
{
return
INTERCEPTOR_CLASS
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
}
};
}
@Override
protected
ClassMatch
enhanceClass
()
{
return
byHierarchyMatch
(
new
String
[]
{
ENHANCE_CLASS
});
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/ConsumeMessageOrderlyInstrumentation.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
named
;
import
static
org
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
HierarchyMatch
.
byHierarchyMatch
;
/**
* {@link ConsumeMessageOrderlyInstrumentation} intercepts the {@link org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly#consumeMessage(java.util.List,
* org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext)} method by using {@link
* org.apache.rocketmq.common.message.MessageConcurrentlyConsumeInterceptor}.
*
* @author zhang xin
*/
public
class
ConsumeMessageOrderlyInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
private
static
final
String
ENHANCE_CLASS
=
"org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly"
;
private
static
final
String
ENHANCE_METHOD
=
"consumeMessage"
;
private
static
final
String
INTERCEPTOR_CLASS
=
"org.apache.rocketmq.common.message.MessageOrderlyConsumeInterceptor"
;
@Override
protected
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[
0
];
}
@Override
protected
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[]
{
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
ENHANCE_METHOD
);
}
@Override
public
String
getMethodsInterceptor
()
{
return
INTERCEPTOR_CLASS
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
}
};
}
@Override
protected
ClassMatch
enhanceClass
()
{
return
byHierarchyMatch
(
new
String
[]
{
ENHANCE_CLASS
});
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/MQClientAPIImplInstrumentation.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
named
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
takesArguments
;
import
static
org
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
NameMatch
.
byName
;
/**
* {@link MQClientAPIImplInstrumentation} intercepts the {@link org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage(String,
* String, org.apache.rocketmq.common.message.Message, org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader,
* long, org.apache.rocketmq.client.impl.CommunicationMode, org.apache.rocketmq.client.producer.SendCallback,
* org.apache.rocketmq.client.impl.producer.TopicPublishInfo, org.apache.rocketmq.client.impl.factory.MQClientInstance,
* int, org.apache.rocketmq.client.hook.SendMessageContext, org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl)}
* method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor}.
*
* @author zhang xin
*/
public
class
MQClientAPIImplInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
private
static
final
String
ENHANCE_CLASS
=
"org.apache.rocketmq.client.impl.MQClientAPIImpl"
;
private
static
final
String
SEND_MESSAGE_METHOD_NAME
=
"sendMessage"
;
private
static
final
String
ASYNC_METHOD_INTERCEPTOR
=
"org.skywalking.apm.plugin.rocketMQ.v4.MessageSendInterceptor"
;
@Override
protected
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[
0
];
}
@Override
protected
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[]
{
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
SEND_MESSAGE_METHOD_NAME
).
and
(
takesArguments
(
12
));
}
@Override
public
String
getMethodsInterceptor
()
{
return
ASYNC_METHOD_INTERCEPTOR
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
}
};
}
@Override
protected
ClassMatch
enhanceClass
()
{
return
byName
(
ENHANCE_CLASS
);
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallBackEnhanceInfo.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4.define
;
import
org.skywalking.apm.agent.core.context.ContextSnapshot
;
/**
* {@link SendCallBackEnhanceInfo} saves the topic Id and {@link ContextSnapshot} instance for trace.
*
* @author zhang xin
*/
public
class
SendCallBackEnhanceInfo
{
private
String
topicId
;
private
ContextSnapshot
contextSnapshot
;
public
SendCallBackEnhanceInfo
(
String
topicId
,
ContextSnapshot
contextSnapshot
)
{
this
.
topicId
=
topicId
;
this
.
contextSnapshot
=
contextSnapshot
;
}
public
String
getTopicId
()
{
return
topicId
;
}
public
ContextSnapshot
getContextSnapshot
()
{
return
contextSnapshot
;
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/skywalking/apm/plugin/rocketMQ/v4/define/SendCallbackInstrumentation.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
named
;
import
static
org
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
bytebuddy
.
ArgumentTypeNameMatch
.
takesArgumentWithType
;
import
static
org
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
HierarchyMatch
.
byHierarchyMatch
;
/**
* {@link SendCallbackInstrumentation} intercepts {@link org.apache.rocketmq.client.producer.SendCallback#onSuccess(SendResult)}
* method by using {@link org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor} and also intercepts {@link
* org.apache.rocketmq.client.producer.SendCallback#onException(Throwable)} by using {@link
* org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor}.
*
* @author zhang xin
*/
public
class
SendCallbackInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
private
static
final
String
ENHANCE_CLASS
=
"org.apache.rocketmq.client.producer.SendCallback"
;
private
static
final
String
ON_SUCCESS_ENHANCE_METHOD
=
"onSuccess"
;
private
static
final
String
ON_SUCCESS_INTERCEPTOR
=
"org.skywalking.apm.plugin.rocketMQ.v4.OnSuccessInterceptor"
;
private
static
final
String
ON_EXCEPTION_METHOD
=
"onException"
;
private
static
final
String
ON_EXCEPTION_INTERCEPTOR
=
"org.skywalking.apm.plugin.rocketMQ.v4.OnExceptionInterceptor"
;
@Override
protected
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[
0
];
}
@Override
protected
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[]
{
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
ON_SUCCESS_ENHANCE_METHOD
).
and
(
takesArgumentWithType
(
0
,
"org.apache.rocketmq.client.producer.SendResult"
));
}
@Override
public
String
getMethodsInterceptor
()
{
return
ON_SUCCESS_INTERCEPTOR
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
},
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
ON_EXCEPTION_METHOD
).
and
(
takesArgumentWithType
(
0
,
"java.lang.Throwable"
));
}
@Override
public
String
getMethodsInterceptor
()
{
return
ON_EXCEPTION_INTERCEPTOR
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
}
};
}
@Override
protected
ClassMatch
enhanceClass
()
{
return
byHierarchyMatch
(
new
String
[]
{
ENHANCE_CLASS
});
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def
0 → 100644
浏览文件 @
30dbdc82
rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageConcurrentlyInstrumentation
rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation
rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation
rocketMQ-4.x=org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptorTest.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.util.List
;
import
org.apache.rocketmq.client.impl.CommunicationMode
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Matchers
;
import
org.mockito.Mock
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.modules.junit4.PowerMockRunnerDelegate
;
import
org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
;
import
org.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.test.helper.SegmentHelper
;
import
org.skywalking.apm.agent.test.tools.AgentServiceRule
;
import
org.skywalking.apm.agent.test.tools.SegmentStorage
;
import
org.skywalking.apm.agent.test.tools.SegmentStoragePoint
;
import
org.skywalking.apm.agent.test.tools.SpanAssert
;
import
org.skywalking.apm.agent.test.tools.TracingSegmentRunner
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
junit
.
Assert
.
assertThat
;
import
static
org
.
mockito
.
Matchers
.
anyString
;
import
static
org
.
mockito
.
Mockito
.
times
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
when
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockRunnerDelegate
(
TracingSegmentRunner
.
class
)
public
class
MessageSendInterceptorTest
{
private
MessageSendInterceptor
messageSendInterceptor
;
@SegmentStoragePoint
private
SegmentStorage
segmentStorage
;
@Rule
public
AgentServiceRule
serviceRule
=
new
AgentServiceRule
();
private
Object
[]
arguments
;
private
Object
[]
argumentsWithoutCallback
;
@Mock
private
Message
message
;
@Mock
private
SendMessageRequestHeader
messageRequestHeader
;
@Mock
private
EnhancedInstance
callBack
;
@Before
public
void
setUp
()
{
messageSendInterceptor
=
new
MessageSendInterceptor
();
arguments
=
new
Object
[]
{
"127.0.0.1"
,
"test"
,
message
,
messageRequestHeader
,
null
,
CommunicationMode
.
ASYNC
,
callBack
};
argumentsWithoutCallback
=
new
Object
[]
{
"127.0.0.1"
,
"test"
,
message
,
messageRequestHeader
,
null
,
CommunicationMode
.
ASYNC
,
null
};
when
(
messageRequestHeader
.
getProperties
()).
thenReturn
(
""
);
when
(
message
.
getTags
()).
thenReturn
(
"TagA"
);
}
@Test
public
void
testSendMessage
()
throws
Throwable
{
messageSendInterceptor
.
beforeMethod
(
null
,
null
,
arguments
,
null
,
null
);
messageSendInterceptor
.
afterMethod
(
null
,
null
,
arguments
,
null
,
null
);
assertThat
(
segmentStorage
.
getTraceSegments
().
size
(),
is
(
1
));
TraceSegment
traceSegment
=
segmentStorage
.
getTraceSegments
().
get
(
0
);
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
AbstractTracingSpan
mqSpan
=
spans
.
get
(
0
);
SpanAssert
.
assertLayer
(
mqSpan
,
SpanLayer
.
MQ
);
SpanAssert
.
assertComponent
(
mqSpan
,
ComponentsDefine
.
ROCKET_MQ
);
SpanAssert
.
assertTag
(
mqSpan
,
0
,
"test"
);
SpanAssert
.
assertTag
(
mqSpan
,
1
,
"TagA"
);
verify
(
messageRequestHeader
,
times
(
1
)).
setProperties
(
anyString
());
verify
(
callBack
,
times
(
1
)).
setSkyWalkingDynamicField
(
Matchers
.
any
());
}
@Test
public
void
testSendMessageWithoutCallBack
()
throws
Throwable
{
messageSendInterceptor
.
beforeMethod
(
null
,
null
,
argumentsWithoutCallback
,
null
,
null
);
messageSendInterceptor
.
afterMethod
(
null
,
null
,
argumentsWithoutCallback
,
null
,
null
);
assertThat
(
segmentStorage
.
getTraceSegments
().
size
(),
is
(
1
));
TraceSegment
traceSegment
=
segmentStorage
.
getTraceSegments
().
get
(
0
);
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
AbstractTracingSpan
mqSpan
=
spans
.
get
(
0
);
SpanAssert
.
assertLayer
(
mqSpan
,
SpanLayer
.
MQ
);
SpanAssert
.
assertComponent
(
mqSpan
,
ComponentsDefine
.
ROCKET_MQ
);
SpanAssert
.
assertTag
(
mqSpan
,
0
,
"test"
);
SpanAssert
.
assertTag
(
mqSpan
,
1
,
"TagA"
);
verify
(
messageRequestHeader
,
times
(
1
)).
setProperties
(
anyString
());
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnExceptionInterceptorTest.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.util.List
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.modules.junit4.PowerMockRunnerDelegate
;
import
org.skywalking.apm.agent.core.context.ContextSnapshot
;
import
org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
;
import
org.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.test.helper.SegmentHelper
;
import
org.skywalking.apm.agent.test.helper.SpanHelper
;
import
org.skywalking.apm.agent.test.tools.AgentServiceRule
;
import
org.skywalking.apm.agent.test.tools.SegmentStorage
;
import
org.skywalking.apm.agent.test.tools.SegmentStoragePoint
;
import
org.skywalking.apm.agent.test.tools.SpanAssert
;
import
org.skywalking.apm.agent.test.tools.TracingSegmentRunner
;
import
org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
junit
.
Assert
.
assertThat
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
when
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockRunnerDelegate
(
TracingSegmentRunner
.
class
)
public
class
OnExceptionInterceptorTest
{
private
OnExceptionInterceptor
exceptionInterceptor
;
@SegmentStoragePoint
private
SegmentStorage
segmentStorage
;
@Rule
public
AgentServiceRule
serviceRule
=
new
AgentServiceRule
();
@Mock
private
ContextSnapshot
contextSnapshot
;
private
SendCallBackEnhanceInfo
enhanceInfo
;
@Mock
private
EnhancedInstance
enhancedInstance
;
@Before
public
void
setUp
()
{
exceptionInterceptor
=
new
OnExceptionInterceptor
();
enhanceInfo
=
new
SendCallBackEnhanceInfo
(
"test"
,
contextSnapshot
);
when
(
enhancedInstance
.
getSkyWalkingDynamicField
()).
thenReturn
(
enhanceInfo
);
}
@Test
public
void
testOnException
()
throws
Throwable
{
exceptionInterceptor
.
beforeMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
new
RuntimeException
()},
null
,
null
);
exceptionInterceptor
.
afterMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
new
RuntimeException
()},
null
,
null
);
assertThat
(
segmentStorage
.
getTraceSegments
().
size
(),
is
(
1
));
TraceSegment
traceSegment
=
segmentStorage
.
getTraceSegments
().
get
(
0
);
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
AbstractTracingSpan
exceptionSpan
=
spans
.
get
(
0
);
SpanAssert
.
assertException
(
SpanHelper
.
getLogs
(
exceptionSpan
).
get
(
0
),
RuntimeException
.
class
);
SpanAssert
.
assertOccurException
(
exceptionSpan
,
true
);
}
}
apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/test/java/org/skywalking/apm/plugin/rocketMQ/v4/OnSuccessInterceptorTest.java
0 → 100644
浏览文件 @
30dbdc82
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package
org.skywalking.apm.plugin.rocketMQ.v4
;
import
java.util.List
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.modules.junit4.PowerMockRunnerDelegate
;
import
org.skywalking.apm.agent.core.context.ContextSnapshot
;
import
org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
;
import
org.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.skywalking.apm.agent.test.helper.SegmentHelper
;
import
org.skywalking.apm.agent.test.tools.AgentServiceRule
;
import
org.skywalking.apm.agent.test.tools.SegmentStorage
;
import
org.skywalking.apm.agent.test.tools.SegmentStoragePoint
;
import
org.skywalking.apm.agent.test.tools.SpanAssert
;
import
org.skywalking.apm.agent.test.tools.TracingSegmentRunner
;
import
org.skywalking.apm.network.trace.component.ComponentsDefine
;
import
org.skywalking.apm.plugin.rocketMQ.v4.define.SendCallBackEnhanceInfo
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
junit
.
Assert
.
assertThat
;
import
static
org
.
powermock
.
api
.
mockito
.
PowerMockito
.
when
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockRunnerDelegate
(
TracingSegmentRunner
.
class
)
public
class
OnSuccessInterceptorTest
{
private
OnSuccessInterceptor
successInterceptor
;
@SegmentStoragePoint
private
SegmentStorage
segmentStorage
;
@Rule
public
AgentServiceRule
serviceRule
=
new
AgentServiceRule
();
@Mock
private
ContextSnapshot
contextSnapshot
;
@Mock
private
SendResult
sendResult
;
private
SendCallBackEnhanceInfo
enhanceInfo
;
@Mock
private
EnhancedInstance
enhancedInstance
;
@Before
public
void
setUp
()
{
successInterceptor
=
new
OnSuccessInterceptor
();
enhanceInfo
=
new
SendCallBackEnhanceInfo
(
"test"
,
contextSnapshot
);
when
(
enhancedInstance
.
getSkyWalkingDynamicField
()).
thenReturn
(
enhanceInfo
);
when
(
sendResult
.
getSendStatus
()).
thenReturn
(
SendStatus
.
SEND_OK
);
}
@Test
public
void
testOnSuccess
()
throws
Throwable
{
successInterceptor
.
beforeMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
sendResult
},
null
,
null
);
successInterceptor
.
afterMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
sendResult
},
null
,
null
);
assertThat
(
segmentStorage
.
getTraceSegments
().
size
(),
is
(
1
));
TraceSegment
traceSegment
=
segmentStorage
.
getTraceSegments
().
get
(
0
);
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
AbstractTracingSpan
successSpan
=
spans
.
get
(
0
);
SpanAssert
.
assertComponent
(
successSpan
,
ComponentsDefine
.
ROCKET_MQ
);
}
@Test
public
void
testOnSuccessWithErrorStatus
()
throws
Throwable
{
when
(
sendResult
.
getSendStatus
()).
thenReturn
(
SendStatus
.
FLUSH_SLAVE_TIMEOUT
);
successInterceptor
.
beforeMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
sendResult
},
null
,
null
);
successInterceptor
.
afterMethod
(
enhancedInstance
,
null
,
new
Object
[]
{
sendResult
},
null
,
null
);
assertThat
(
segmentStorage
.
getTraceSegments
().
size
(),
is
(
1
));
TraceSegment
traceSegment
=
segmentStorage
.
getTraceSegments
().
get
(
0
);
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
AbstractTracingSpan
successSpan
=
spans
.
get
(
0
);
SpanAssert
.
assertComponent
(
successSpan
,
ComponentsDefine
.
ROCKET_MQ
);
SpanAssert
.
assertOccurException
(
successSpan
,
true
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录