Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
润土的好友猹
SkyWalking
提交
9de37243
S
SkyWalking
项目概览
润土的好友猹
/
SkyWalking
与 Fork 源项目一致
Fork自
apache / SkyWalking
通知
9
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
SkyWalking
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
9de37243
编写于
4月 26, 2021
作者:
Z
ZhangZhaoyuan
提交者:
GitHub
4月 26, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: add enhance pulsar MessageListener instance (#6774)
上级
1aa5f268
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
918 addition
and
92 deletion
+918
-92
CHANGES.md
CHANGES.md
+1
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
...ing/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
+1
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
...alking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
+13
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageConstructorInterceptor.java
...king/apm/plugin/pulsar/MessageConstructorInterceptor.java
+39
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
...walking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
+53
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
...ywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
+15
-1
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
.../apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
+86
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
...king/apm/plugin/pulsar/define/MessageInstrumentation.java
+71
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerListenerInstrumentation.java
.../pulsar/define/PulsarConsumerListenerInstrumentation.java
+76
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
...in/pulsar-plugin/src/main/resources/skywalking-plugin.def
+3
-1
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
...apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
+9
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java
...org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java
+181
-0
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
.../org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
+14
-1
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
...king/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
+30
-3
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
.../plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
+172
-0
test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
...plugin/scenarios/pulsar-scenario/config/expectedData.yaml
+118
-75
test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
...alking/apm/testcase/pulsar/controller/CaseController.java
+36
-11
未找到文件。
CHANGES.md
浏览文件 @
9de37243
...
...
@@ -17,6 +17,7 @@ Release Notes.
*
Add Seata in the component definition. Seata plugin hosts on Seata project.
*
Extended Kafka plugin to properly trace consumers that have topic partitions directly assigned.
*
Support print SkyWalking context to logs.
*
Add
`MessageListener`
enhancement in pulsar plugin
#### OAP-Backend
*
BugFix: filter invalid Envoy access logs whose socket address is empty.
...
...
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptor.java
浏览文件 @
9de37243
...
...
@@ -45,6 +45,7 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
requireInfo
.
setServiceUrl
(
pulsarClient
.
getLookup
().
getServiceUrl
());
requireInfo
.
setTopic
(
topic
);
requireInfo
.
setSubscriptionName
(
consumerConfigurationData
.
getSubscriptionName
());
requireInfo
.
setHasMessageListener
(
consumerConfigurationData
.
getMessageListener
()
!=
null
);
objInst
.
setSkyWalkingDynamicField
(
requireInfo
);
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerEnhanceRequiredInfo.java
浏览文件 @
9de37243
...
...
@@ -38,6 +38,19 @@ public class ConsumerEnhanceRequiredInfo {
*/
private
String
subscriptionName
;
/**
* whether the consumer has a message listener
*/
private
boolean
hasMessageListener
;
public
boolean
isHasMessageListener
()
{
return
hasMessageListener
;
}
public
void
setHasMessageListener
(
boolean
hasMessageListener
)
{
this
.
hasMessageListener
=
hasMessageListener
;
}
public
String
getServiceUrl
()
{
return
serviceUrl
;
}
...
...
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageConstructorInterceptor.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor
;
/**
* Interceptor of pulsar message constructor.
* <p>
* The interceptor create {@link MessageEnhanceRequiredInfo} which is required by passing message span across
* threads. Another purpose of this interceptor is to make {@link ClassEnhancePluginDefine} enable enhanced class
* to implement {@link EnhancedInstance} interface. Because if {@link ClassEnhancePluginDefine} class will not create
* SkyWalkingDynamicField without any constructor or method interceptor.
*/
public
class
MessageConstructorInterceptor
implements
InstanceConstructorInterceptor
{
@Override
public
void
onConstruct
(
EnhancedInstance
objInst
,
Object
[]
allArguments
)
{
objInst
.
setSkyWalkingDynamicField
(
new
MessageEnhanceRequiredInfo
());
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/MessageEnhanceRequiredInfo.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.skywalking.apm.agent.core.context.ContextSnapshot
;
/**
* Pulsar message enhance required info is required by consumer enhanced object method interceptor
*/
public
class
MessageEnhanceRequiredInfo
{
/**
* topic name of the consumer
*/
private
String
topic
;
/**
* receiving message span snapshot
*/
private
ContextSnapshot
contextSnapshot
;
public
ContextSnapshot
getContextSnapshot
()
{
return
contextSnapshot
;
}
public
void
setContextSnapshot
(
ContextSnapshot
contextSnapshot
)
{
this
.
contextSnapshot
=
contextSnapshot
;
}
public
String
getTopic
()
{
return
topic
;
}
public
void
setTopic
(
String
topic
)
{
this
.
topic
=
topic
;
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptor.java
浏览文件 @
9de37243
...
...
@@ -41,7 +41,8 @@ import java.lang.reflect.Method;
* 1. Get the @{@link ConsumerEnhanceRequiredInfo} and record the service url, topic name and subscription name
* 2. Create the entry span when call <code>messageProcessed</code> method
* 3. Extract all the <code>Trace Context</code> when call <code>messageProcessed</code> method
* 4. Stop the entry span when <code>messageProcessed</code> method finished.
* 4. Capture trace context and set into SkyWalkingDynamic field if consumer has a message listener when <code>messageProcessed</code> method finished
* 5. Stop the entry span.
* </pre>
*/
public
class
PulsarConsumerInterceptor
implements
InstanceMethodsAroundInterceptor
{
...
...
@@ -74,6 +75,19 @@ public class PulsarConsumerInterceptor implements InstanceMethodsAroundIntercept
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
if
(
allArguments
[
0
]
!=
null
)
{
final
ConsumerEnhanceRequiredInfo
requiredInfo
=
(
ConsumerEnhanceRequiredInfo
)
objInst
.
getSkyWalkingDynamicField
();
if
(
requiredInfo
.
isHasMessageListener
())
{
EnhancedInstance
msg
=
(
EnhancedInstance
)
allArguments
[
0
];
MessageEnhanceRequiredInfo
messageEnhanceRequiredInfo
=
(
MessageEnhanceRequiredInfo
)
msg
.
getSkyWalkingDynamicField
();
if
(
messageEnhanceRequiredInfo
==
null
)
{
messageEnhanceRequiredInfo
=
new
MessageEnhanceRequiredInfo
();
msg
.
setSkyWalkingDynamicField
(
messageEnhanceRequiredInfo
);
}
messageEnhanceRequiredInfo
.
setTopic
(
requiredInfo
.
getTopic
());
messageEnhanceRequiredInfo
.
setContextSnapshot
(
ContextManager
.
capture
());
}
ContextManager
.
stopSpan
();
}
return
ret
;
...
...
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptor.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.pulsar.client.api.MessageListener
;
import
org.apache.skywalking.apm.agent.core.context.ContextManager
;
import
org.apache.skywalking.apm.agent.core.context.tag.Tags
;
import
org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan
;
import
org.apache.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult
;
import
org.apache.skywalking.apm.network.trace.component.ComponentsDefine
;
import
java.lang.reflect.Method
;
/**
* Interceptor for getting pulsar consumer message listener enhanced instance
* <p>
* Here is the intercept process steps:
*
* <pre>
* 1. Return null if {@link org.apache.pulsar.client.impl.conf.ConsumerConfigurationData} has no message listener
* 2. Return a new lambda expression wrap original message listener
* 3. New lambda will create local span that continued message reception span
* 4. Stop the local span when original message listener <code>received</code> method finished.
* </pre>
*/
public
class
PulsarConsumerListenerInterceptor
implements
InstanceMethodsAroundInterceptor
{
public
static
final
String
OPERATE_NAME_PREFIX
=
"Pulsar/"
;
public
static
final
String
CONSUMER_OPERATE_NAME
=
"/Consumer/MessageListener"
;
@Override
public
void
beforeMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
MethodInterceptResult
result
)
throws
Throwable
{
}
@Override
public
Object
afterMethod
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Object
ret
)
throws
Throwable
{
return
ret
==
null
?
null
:
(
MessageListener
)
(
consumer
,
message
)
->
{
final
MessageEnhanceRequiredInfo
requiredInfo
=
(
MessageEnhanceRequiredInfo
)
((
EnhancedInstance
)
message
)
.
getSkyWalkingDynamicField
();
if
(
requiredInfo
==
null
)
{
((
MessageListener
)
ret
).
received
(
consumer
,
message
);
}
else
{
AbstractSpan
activeSpan
=
ContextManager
.
createLocalSpan
(
OPERATE_NAME_PREFIX
+
requiredInfo
.
getTopic
()
+
CONSUMER_OPERATE_NAME
);
activeSpan
.
setComponent
(
ComponentsDefine
.
PULSAR_CONSUMER
);
SpanLayer
.
asMQ
(
activeSpan
);
Tags
.
MQ_TOPIC
.
set
(
activeSpan
,
requiredInfo
.
getTopic
());
ContextManager
.
continued
(
requiredInfo
.
getContextSnapshot
());
try
{
((
MessageListener
)
ret
).
received
(
consumer
,
message
);
}
catch
(
Exception
e
)
{
ContextManager
.
activeSpan
().
log
(
e
);
}
finally
{
ContextManager
.
stopSpan
();
}
}
};
}
@Override
public
void
handleMethodException
(
EnhancedInstance
objInst
,
Method
method
,
Object
[]
allArguments
,
Class
<?>[]
argumentsTypes
,
Throwable
t
)
{
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/MessageInstrumentation.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.BooleanMatcher
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
org
.
apache
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
HierarchyMatch
.
byHierarchyMatch
;
/**
* Pulsar message instrumentation.
* <p>
* The message enhanced object is only for passing message reception span across threads.
* <p>
* Enhanced message object will be injected {@link org.apache.skywalking.apm.plugin.pulsar.MessageEnhanceRequiredInfo}
* after message process method if consumer has a message listener.
* </p>
*/
public
class
MessageInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
public
static
final
String
ENHANCE_CLASS
=
"org.apache.pulsar.client.api.Message"
;
public
static
final
String
CONSTRUCTOR_INTERCEPTOR_CLASS
=
"org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor"
;
@Override
public
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[]{
new
ConstructorInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getConstructorMatcher
()
{
return
new
BooleanMatcher
<>(
true
);
}
@Override
public
String
getConstructorInterceptor
()
{
return
CONSTRUCTOR_INTERCEPTOR_CLASS
;
}
}
};
}
@Override
public
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[
0
];
}
@Override
protected
ClassMatch
enhanceClass
()
{
return
byHierarchyMatch
(
ENHANCE_CLASS
);
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/define/PulsarConsumerListenerInstrumentation.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar.define
;
import
net.bytebuddy.description.method.MethodDescription
;
import
net.bytebuddy.matcher.ElementMatcher
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine
;
import
org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch
;
import
static
net
.
bytebuddy
.
matcher
.
ElementMatchers
.
named
;
import
static
org
.
apache
.
skywalking
.
apm
.
agent
.
core
.
plugin
.
match
.
NameMatch
.
byName
;
/**
* The pulsar consumer listener instrumentation use {@link org.apache.pulsar.client.api.MessageListener} as an enhanced
* class.
* <p>
* User will implement {@link org.apache.pulsar.client.api.MessageListener} interface to consume message and enhance
* all instances of {@link org.apache.pulsar.client.api.MessageListener} interface can let users get trace information
* in message listener thread.
*/
public
class
PulsarConsumerListenerInstrumentation
extends
ClassInstanceMethodsEnhancePluginDefine
{
public
static
final
String
INTERCEPTOR_CLASS
=
"org.apache.skywalking.apm.plugin.pulsar.PulsarConsumerListenerInterceptor"
;
public
static
final
String
ENHANCE_METHOD
=
"getMessageListener"
;
public
static
final
String
ENHANCE_CLASS
=
"org.apache.pulsar.client.impl.conf.ConsumerConfigurationData"
;
@Override
protected
ClassMatch
enhanceClass
()
{
return
byName
(
ENHANCE_CLASS
);
}
@Override
public
ConstructorInterceptPoint
[]
getConstructorsInterceptPoints
()
{
return
new
ConstructorInterceptPoint
[
0
];
}
@Override
public
InstanceMethodsInterceptPoint
[]
getInstanceMethodsInterceptPoints
()
{
return
new
InstanceMethodsInterceptPoint
[]{
new
InstanceMethodsInterceptPoint
()
{
@Override
public
ElementMatcher
<
MethodDescription
>
getMethodsMatcher
()
{
return
named
(
ENHANCE_METHOD
);
}
@Override
public
String
getMethodsInterceptor
()
{
return
INTERCEPTOR_CLASS
;
}
@Override
public
boolean
isOverrideArgs
()
{
return
false
;
}
}
};
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/main/resources/skywalking-plugin.def
浏览文件 @
9de37243
...
...
@@ -16,4 +16,6 @@
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
\ No newline at end of file
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerListenerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
\ No newline at end of file
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ConsumerConstructorInterceptorTest.java
浏览文件 @
9de37243
...
...
@@ -18,6 +18,7 @@
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.pulsar.client.api.PulsarClientException
;
import
org.apache.pulsar.client.impl.LookupService
;
import
org.apache.pulsar.client.impl.PulsarClientImpl
;
import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData
;
...
...
@@ -70,6 +71,13 @@ public class ConsumerConstructorInterceptorTest {
when
(
lookupService
.
getServiceUrl
()).
thenReturn
(
SERVICE_URL
);
when
(
pulsarClient
.
getLookup
()).
thenReturn
(
lookupService
);
when
(
consumerConfigurationData
.
getSubscriptionName
()).
thenReturn
(
SUBSCRIPTION_NAME
);
when
(
consumerConfigurationData
.
getMessageListener
()).
thenReturn
((
consumer
,
message
)
->
{
try
{
consumer
.
acknowledge
(
message
);
}
catch
(
PulsarClientException
e
)
{
e
.
printStackTrace
();
}
});
constructorInterceptor
=
new
ConsumerConstructorInterceptor
();
}
...
...
@@ -84,5 +92,6 @@ public class ConsumerConstructorInterceptorTest {
assertThat
(
requiredInfo
.
getServiceUrl
(),
is
(
SERVICE_URL
));
assertThat
(
requiredInfo
.
getTopic
(),
is
(
TOPIC_NAME
));
assertThat
(
requiredInfo
.
getSubscriptionName
(),
is
(
SUBSCRIPTION_NAME
));
assertThat
(
requiredInfo
.
isHasMessageListener
(),
is
(
true
));
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockConsumer.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.pulsar.client.api.Consumer
;
import
org.apache.pulsar.client.api.ConsumerStats
;
import
org.apache.pulsar.client.api.Message
;
import
org.apache.pulsar.client.api.MessageId
;
import
org.apache.pulsar.client.api.PulsarClientException
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.TimeUnit
;
public
class
MockConsumer
implements
Consumer
{
@Override
public
String
getTopic
()
{
return
null
;
}
@Override
public
String
getSubscription
()
{
return
null
;
}
@Override
public
void
unsubscribe
()
throws
PulsarClientException
{
}
@Override
public
CompletableFuture
<
Void
>
unsubscribeAsync
()
{
return
null
;
}
@Override
public
Message
receive
()
throws
PulsarClientException
{
return
null
;
}
@Override
public
CompletableFuture
<
Message
>
receiveAsync
()
{
return
null
;
}
@Override
public
Message
receive
(
int
i
,
TimeUnit
timeUnit
)
throws
PulsarClientException
{
return
null
;
}
@Override
public
void
acknowledge
(
MessageId
messageId
)
throws
PulsarClientException
{
}
@Override
public
void
negativeAcknowledge
(
MessageId
messageId
)
{
}
@Override
public
void
acknowledgeCumulative
(
MessageId
messageId
)
throws
PulsarClientException
{
}
@Override
public
CompletableFuture
<
Void
>
acknowledgeAsync
(
MessageId
messageId
)
{
return
null
;
}
@Override
public
CompletableFuture
<
Void
>
acknowledgeCumulativeAsync
(
MessageId
messageId
)
{
return
null
;
}
@Override
public
ConsumerStats
getStats
()
{
return
null
;
}
@Override
public
void
close
()
throws
PulsarClientException
{
}
@Override
public
CompletableFuture
<
Void
>
closeAsync
()
{
return
null
;
}
@Override
public
boolean
hasReachedEndOfTopic
()
{
return
false
;
}
@Override
public
void
redeliverUnacknowledgedMessages
()
{
}
@Override
public
void
seek
(
MessageId
messageId
)
throws
PulsarClientException
{
}
@Override
public
void
seek
(
long
l
)
throws
PulsarClientException
{
}
@Override
public
CompletableFuture
<
Void
>
seekAsync
(
MessageId
messageId
)
{
return
null
;
}
@Override
public
CompletableFuture
<
Void
>
seekAsync
(
long
l
)
{
return
null
;
}
@Override
public
boolean
isConnected
()
{
return
false
;
}
@Override
public
String
getConsumerName
()
{
return
null
;
}
@Override
public
void
pause
()
{
}
@Override
public
void
resume
()
{
}
@Override
public
CompletableFuture
<
Void
>
acknowledgeCumulativeAsync
(
Message
message
)
{
return
null
;
}
@Override
public
CompletableFuture
<
Void
>
acknowledgeAsync
(
Message
message
)
{
return
null
;
}
@Override
public
void
acknowledgeCumulative
(
Message
message
)
throws
PulsarClientException
{
}
@Override
public
void
negativeAcknowledge
(
Message
message
)
{
}
@Override
public
void
acknowledge
(
Message
message
)
throws
PulsarClientException
{
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java
浏览文件 @
9de37243
...
...
@@ -22,17 +22,20 @@ import org.apache.pulsar.client.api.Schema;
import
org.apache.pulsar.client.impl.MessageImpl
;
import
org.apache.pulsar.common.api.proto.PulsarApi
;
import
org.apache.pulsar.shade.io.netty.buffer.ByteBuf
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Map
;
public
class
MockMessage
extends
MessageImpl
{
public
class
MockMessage
extends
MessageImpl
implements
EnhancedInstance
{
private
PulsarApi
.
MessageMetadata
.
Builder
msgMetadataBuilder
=
PulsarApi
.
MessageMetadata
.
newBuilder
();
private
transient
Map
<
String
,
String
>
properties
;
private
Object
enhancedSkyWalkingField
;
public
MockMessage
()
{
this
(
null
,
"1:1"
,
new
HashMap
(),
null
,
null
);
}
...
...
@@ -66,4 +69,14 @@ public class MockMessage extends MessageImpl {
public
String
getProperty
(
String
name
)
{
return
this
.
getProperties
().
get
(
name
);
}
@Override
public
Object
getSkyWalkingDynamicField
()
{
return
enhancedSkyWalkingField
;
}
@Override
public
void
setSkyWalkingDynamicField
(
Object
value
)
{
this
.
enhancedSkyWalkingField
=
value
;
}
}
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerInterceptorTest.java
浏览文件 @
9de37243
...
...
@@ -59,6 +59,8 @@ public class PulsarConsumerInterceptorTest {
private
ConsumerEnhanceRequiredInfo
consumerEnhanceRequiredInfo
;
private
MessageEnhanceRequiredInfo
messageEnhanceRequiredInfo
;
private
PulsarConsumerInterceptor
consumerInterceptor
;
private
MockMessage
msg
;
...
...
@@ -85,9 +87,11 @@ public class PulsarConsumerInterceptorTest {
consumerEnhanceRequiredInfo
.
setSubscriptionName
(
"my-sub"
);
msg
=
new
MockMessage
();
msg
.
getMessageBuilder
()
.
addProperties
(
PulsarApi
.
KeyValue
.
newBuilder
()
.
setKey
(
SW8CarrierItem
.
HEADER_NAME
)
.
setValue
(
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
));
.
addProperties
(
PulsarApi
.
KeyValue
.
newBuilder
()
.
setKey
(
SW8CarrierItem
.
HEADER_NAME
)
.
setValue
(
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
));
messageEnhanceRequiredInfo
=
new
MessageEnhanceRequiredInfo
();
msg
.
setSkyWalkingDynamicField
(
messageEnhanceRequiredInfo
);
}
@Test
...
...
@@ -116,6 +120,29 @@ public class PulsarConsumerInterceptorTest {
assertConsumerSpan
(
spans
.
get
(
0
));
}
@Test
public
void
testConsumerWithMessageListener
()
throws
Throwable
{
consumerEnhanceRequiredInfo
.
setHasMessageListener
(
true
);
consumerInterceptor
.
beforeMethod
(
consumerInstance
,
null
,
new
Object
[]{
msg
},
new
Class
[
0
],
null
);
consumerInterceptor
.
afterMethod
(
consumerInstance
,
null
,
new
Object
[]{
msg
},
new
Class
[
0
],
null
);
List
<
TraceSegment
>
traceSegments
=
segmentStorage
.
getTraceSegments
();
assertThat
(
traceSegments
.
size
(),
is
(
1
));
TraceSegment
traceSegment
=
traceSegments
.
get
(
0
);
assertNotNull
(
traceSegment
.
getRef
());
assertTraceSegmentRef
(
traceSegment
.
getRef
());
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
assertConsumerSpan
(
spans
.
get
(
0
));
final
MessageEnhanceRequiredInfo
requiredInfo
=
(
MessageEnhanceRequiredInfo
)
msg
.
getSkyWalkingDynamicField
();
assertThat
(
requiredInfo
.
getTopic
(),
is
(
((
ConsumerEnhanceRequiredInfo
)
consumerInstance
.
getSkyWalkingDynamicField
()).
getTopic
()));
assertNotNull
(
requiredInfo
.
getContextSnapshot
());
}
private
void
assertConsumerSpan
(
AbstractTracingSpan
span
)
{
SpanAssert
.
assertLayer
(
span
,
SpanLayer
.
MQ
);
SpanAssert
.
assertComponent
(
span
,
PULSAR_CONSUMER
);
...
...
apm-sniffer/apm-sdk-plugin/pulsar-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/PulsarConsumerListenerInterceptorTest.java
0 → 100644
浏览文件 @
9de37243
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package
org.apache.skywalking.apm.plugin.pulsar
;
import
org.apache.pulsar.client.api.MessageListener
;
import
org.apache.pulsar.common.api.proto.PulsarApi
;
import
org.apache.skywalking.apm.agent.core.context.SW8CarrierItem
;
import
org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan
;
import
org.apache.skywalking.apm.agent.core.context.trace.SpanLayer
;
import
org.apache.skywalking.apm.agent.core.context.trace.TraceSegment
;
import
org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef
;
import
org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef.SegmentRefType
;
import
org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance
;
import
org.apache.skywalking.apm.agent.test.helper.SegmentHelper
;
import
org.apache.skywalking.apm.agent.test.helper.SegmentRefHelper
;
import
org.apache.skywalking.apm.agent.test.tools.AgentServiceRule
;
import
org.apache.skywalking.apm.agent.test.tools.SegmentStorage
;
import
org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint
;
import
org.apache.skywalking.apm.agent.test.tools.SpanAssert
;
import
org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner
;
import
org.hamcrest.MatcherAssert
;
import
org.junit.Before
;
import
org.junit.Rule
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.powermock.modules.junit4.PowerMockRunner
;
import
org.powermock.modules.junit4.PowerMockRunnerDelegate
;
import
java.util.List
;
import
static
org
.
apache
.
skywalking
.
apm
.
network
.
trace
.
component
.
ComponentsDefine
.
PULSAR_CONSUMER
;
import
static
org
.
hamcrest
.
CoreMatchers
.
is
;
import
static
org
.
junit
.
Assert
.
assertNotNull
;
import
static
org
.
junit
.
Assert
.
assertNull
;
import
static
org
.
junit
.
Assert
.
assertThat
;
@RunWith
(
PowerMockRunner
.
class
)
@PowerMockRunnerDelegate
(
TracingSegmentRunner
.
class
)
public
class
PulsarConsumerListenerInterceptorTest
{
@Rule
public
AgentServiceRule
serviceRule
=
new
AgentServiceRule
();
@SegmentStoragePoint
private
SegmentStorage
segmentStorage
;
private
final
EnhancedInstance
consumerConfigurationDataInstance
=
new
EnhancedInstance
()
{
@Override
public
Object
getSkyWalkingDynamicField
()
{
return
null
;
}
@Override
public
void
setSkyWalkingDynamicField
(
Object
value
)
{
}
};
private
PulsarConsumerListenerInterceptor
consumerListenerInterceptor
;
private
MockMessage
msg
;
private
PulsarConsumerInterceptor
consumerInterceptor
;
private
ConsumerEnhanceRequiredInfo
consumerEnhanceRequiredInfo
;
private
final
EnhancedInstance
consumerInstance
=
new
EnhancedInstance
()
{
@Override
public
Object
getSkyWalkingDynamicField
()
{
return
consumerEnhanceRequiredInfo
;
}
@Override
public
void
setSkyWalkingDynamicField
(
Object
value
)
{
consumerEnhanceRequiredInfo
=
(
ConsumerEnhanceRequiredInfo
)
value
;
}
};
private
MockConsumer
consumer
;
private
MessageListener
messageListener
;
@Before
public
void
setUp
()
{
consumerInterceptor
=
new
PulsarConsumerInterceptor
();
consumerListenerInterceptor
=
new
PulsarConsumerListenerInterceptor
();
messageListener
=
(
consumer
,
message
)
->
message
.
getTopicName
();
consumerEnhanceRequiredInfo
=
new
ConsumerEnhanceRequiredInfo
();
consumerEnhanceRequiredInfo
.
setTopic
(
"persistent://my-tenant/my-ns/my-topic"
);
consumerEnhanceRequiredInfo
.
setServiceUrl
(
"pulsar://localhost:6650"
);
consumerEnhanceRequiredInfo
.
setSubscriptionName
(
"my-sub"
);
consumerEnhanceRequiredInfo
.
setHasMessageListener
(
true
);
msg
=
new
MockMessage
();
msg
.
getMessageBuilder
()
.
addProperties
(
PulsarApi
.
KeyValue
.
newBuilder
()
.
setKey
(
SW8CarrierItem
.
HEADER_NAME
)
.
setValue
(
"1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA="
));
msg
.
setSkyWalkingDynamicField
(
new
MessageEnhanceRequiredInfo
());
consumer
=
new
MockConsumer
();
}
@Test
public
void
testWithNoMessageListener
()
throws
Throwable
{
consumerListenerInterceptor
.
beforeMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
null
);
final
MessageListener
messageListener
=
(
MessageListener
)
consumerListenerInterceptor
.
afterMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
null
);
List
<
TraceSegment
>
traceSegments
=
segmentStorage
.
getTraceSegments
();
assertThat
(
traceSegments
.
size
(),
is
(
0
));
assertNull
(
messageListener
);
}
@Test
public
void
testWithMessageListenerHasNoRequiredInfo
()
throws
Throwable
{
consumerListenerInterceptor
.
beforeMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
null
);
final
MessageListener
enhancedMessageListener
=
(
MessageListener
)
consumerListenerInterceptor
.
afterMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
this
.
messageListener
);
assertNotNull
(
enhancedMessageListener
);
msg
.
setSkyWalkingDynamicField
(
null
);
enhancedMessageListener
.
received
(
consumer
,
msg
);
List
<
TraceSegment
>
traceSegments
=
segmentStorage
.
getTraceSegments
();
assertThat
(
traceSegments
.
size
(),
is
(
0
));
}
@Test
public
void
testWithMessageListenerHasRequiredInfo
()
throws
Throwable
{
consumerInterceptor
.
beforeMethod
(
consumerInstance
,
null
,
new
Object
[]{
msg
},
new
Class
[
0
],
null
);
consumerInterceptor
.
afterMethod
(
consumerInstance
,
null
,
new
Object
[]{
msg
},
new
Class
[
0
],
null
);
consumerListenerInterceptor
.
beforeMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
null
);
final
MessageListener
enhancedMessageListener
=
(
MessageListener
)
consumerListenerInterceptor
.
afterMethod
(
consumerConfigurationDataInstance
,
null
,
new
Object
[
0
],
new
Class
[
0
],
this
.
messageListener
);
assertNotNull
(
enhancedMessageListener
);
enhancedMessageListener
.
received
(
consumer
,
msg
);
List
<
TraceSegment
>
traceSegments
=
segmentStorage
.
getTraceSegments
();
assertThat
(
traceSegments
.
size
(),
is
(
2
));
TraceSegment
traceSegment
=
traceSegments
.
get
(
1
);
assertNotNull
(
traceSegment
.
getRef
());
assertTraceSegmentRef
(
traceSegment
.
getRef
());
List
<
AbstractTracingSpan
>
spans
=
SegmentHelper
.
getSpans
(
traceSegment
);
assertThat
(
spans
.
size
(),
is
(
1
));
assertConsumerSpan
(
spans
.
get
(
0
));
}
private
void
assertConsumerSpan
(
AbstractTracingSpan
span
)
{
SpanAssert
.
assertLayer
(
span
,
SpanLayer
.
MQ
);
SpanAssert
.
assertComponent
(
span
,
PULSAR_CONSUMER
);
}
private
void
assertTraceSegmentRef
(
TraceSegmentRef
ref
)
{
MatcherAssert
.
assertThat
(
ref
.
getParentEndpoint
(),
is
(
"Pulsar/persistent://my-tenant/my-ns/my-topic/Consumer/my-sub"
));
MatcherAssert
.
assertThat
(
SegmentRefHelper
.
getSpanId
(
ref
),
is
(
0
));
MatcherAssert
.
assertThat
(
SegmentRefHelper
.
getTraceSegmentId
(
ref
),
is
(
"3.4.5"
));
MatcherAssert
.
assertThat
(
ref
.
getType
(),
is
(
SegmentRefType
.
CROSS_THREAD
));
}
}
test/plugin/scenarios/pulsar-scenario/config/expectedData.yaml
浏览文件 @
9de37243
...
...
@@ -15,79 +15,122 @@
# limitations under the License.
segmentItems
:
-
serviceName
:
pulsar-scenario
segmentSize
:
ge
3
segmentSize
:
ge
6
segments
:
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Producer
operationId
:
0
parentSpanId
:
0
spanId
:
1
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
73
isError
:
false
spanType
:
Exit
peer
:
not
null
tags
:
-
{
key
:
mq.broker
,
value
:
not null
}
-
{
key
:
mq.topic
,
value
:
test
}
skipAnalysis
:
'
false'
-
operationName
:
/case/pulsar-case
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
Http
startTime
:
nq
0
endTime
:
nq
0
componentId
:
14
isError
:
false
spanType
:
Entry
peer
:
'
'
tags
:
-
{
key
:
url
,
value
:
'
http://localhost:8080/pulsar-scenario/case/pulsar-case'
}
-
{
key
:
http.method
,
value
:
GET
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/Producer/Callback
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
73
isError
:
false
spanType
:
Local
peer
:
'
'
tags
:
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
/case/pulsar-case
,
networkAddress
:
'
'
,
refType
:
CrossThread
,
parentSpanId
:
1
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not
null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Consumer/test
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
74
isError
:
false
spanType
:
Entry
peer
:
'
'
tags
:
-
{
key
:
transmission.latency
,
value
:
not null
}
-
{
key
:
mq.broker
,
value
:
not null
}
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
/case/pulsar-case
,
networkAddress
:
not null
,
refType
:
CrossProcess
,
parentSpanId
:
1
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not
null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Producer
operationId
:
0
parentSpanId
:
0
spanId
:
1
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
73
isError
:
false
spanType
:
Exit
peer
:
not
null
tags
:
-
{
key
:
mq.broker
,
value
:
not null
}
-
{
key
:
mq.topic
,
value
:
test
}
skipAnalysis
:
'
false'
-
operationName
:
/case/pulsar-case
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
Http
startTime
:
nq
0
endTime
:
nq
0
componentId
:
14
isError
:
false
spanType
:
Entry
peer
:
'
'
tags
:
-
{
key
:
url
,
value
:
'
http://localhost:8080/pulsar-scenario/case/pulsar-case'
}
-
{
key
:
http.method
,
value
:
GET
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/Producer/Callback
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
73
isError
:
false
spanType
:
Local
peer
:
'
'
tags
:
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
/case/pulsar-case
,
networkAddress
:
'
'
,
refType
:
CrossThread
,
parentSpanId
:
1
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not
null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Consumer/test
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
74
isError
:
false
spanType
:
Entry
peer
:
'
'
tags
:
-
{
key
:
transmission.latency
,
value
:
not null
}
-
{
key
:
mq.broker
,
value
:
not null
}
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
/case/pulsar-case
,
networkAddress
:
not null
,
refType
:
CrossProcess
,
parentSpanId
:
1
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not
null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
skipAnalysis
:
'
false'
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Consumer/testWithListener
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
74
isError
:
false
spanType
:
Entry
peer
:
'
'
skipAnalysis
:
'
false'
tags
:
-
{
key
:
transmission.latency
,
value
:
not null
}
-
{
key
:
mq.broker
,
value
:
not null
}
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
/case/pulsar-case
,
networkAddress
:
not null
,
refType
:
CrossProcess
,
parentSpanId
:
1
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
-
segmentId
:
not
null
spans
:
-
operationName
:
Pulsar/test/Consumer/MessageListener
operationId
:
0
parentSpanId
:
-1
spanId
:
0
spanLayer
:
MQ
startTime
:
nq
0
endTime
:
nq
0
componentId
:
74
isError
:
false
spanType
:
Local
peer
:
'
'
tags
:
-
{
key
:
mq.topic
,
value
:
test
}
refs
:
-
{
parentEndpoint
:
Pulsar/test/Consumer/testWithListener
,
networkAddress
:
'
'
,
refType
:
CrossThread
,
parentSpanId
:
0
,
parentTraceSegmentId
:
not null
,
parentServiceInstance
:
not null
,
parentService
:
pulsar-scenario
,
traceId
:
not null
}
skipAnalysis
:
'
false'
test/plugin/scenarios/pulsar-scenario/src/main/java/test/apache/skywalking/apm/testcase/pulsar/controller/CaseController.java
浏览文件 @
9de37243
...
...
@@ -18,8 +18,6 @@
package
test.apache.skywalking.apm.testcase.pulsar.controller
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
org.apache.pulsar.client.api.Consumer
;
...
...
@@ -33,6 +31,9 @@ import org.springframework.stereotype.Controller;
import
org.springframework.web.bind.annotation.RequestMapping
;
import
org.springframework.web.bind.annotation.ResponseBody
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.TimeUnit
;
@Controller
@RequestMapping
(
"/case"
)
@PropertySource
(
"classpath:application.properties"
)
...
...
@@ -51,15 +52,38 @@ public class CaseController {
String
topic
=
"test"
;
CountDownLatch
latch
=
new
CountDownLatch
(
2
);
PulsarClient
pulsarClient
=
PulsarClient
.
builder
().
serviceUrl
(
PULSAR_DOMAIN
+
serviceUrl
).
build
();
Producer
<
byte
[]>
producer
=
pulsarClient
.
newProducer
().
topic
(
topic
).
create
();
Consumer
<
byte
[]>
consumer
=
pulsarClient
.
newConsumer
().
topic
(
topic
).
subscriptionName
(
"test"
).
subscribe
();
producer
.
newMessage
().
key
(
"testKey"
).
value
(
Integer
.
toString
(
1
).
getBytes
()).
property
(
"TEST"
,
"TEST"
).
send
();
pulsarClient
.
newConsumer
().
topic
(
topic
)
.
subscriptionName
(
"testWithListener"
)
.
messageListener
((
c
,
msg
)
->
{
try
{
if
(
msg
!=
null
)
{
String
propertiesFormat
=
"key = %s, value = %s"
;
StringBuilder
builder
=
new
StringBuilder
();
msg
.
getProperties
()
.
forEach
((
k
,
v
)
->
builder
.
append
(
String
.
format
(
propertiesFormat
,
k
,
v
))
.
append
(
", "
));
LOGGER
.
info
(
"Received message with messageId = {}, key = {}, value = {}, properties = {}"
,
msg
.
getMessageId
(),
msg
.
getKey
(),
new
String
(
msg
.
getValue
()),
builder
.
toString
());
}
c
.
acknowledge
(
msg
);
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"Receive message error"
,
e
);
}
finally
{
latch
.
countDown
();
}
}).
subscribe
();
CountDownLatch
latch
=
new
CountDownLatch
(
1
);
producer
.
newMessage
().
key
(
"testKey"
).
value
(
Integer
.
toString
(
1
).
getBytes
()).
property
(
"TEST"
,
"TEST"
).
send
(
);
Thread
t
=
new
Thread
(()
->
{
try
{
...
...
@@ -68,9 +92,10 @@ public class CaseController {
String
propertiesFormat
=
"key = %s, value = %s"
;
StringBuilder
builder
=
new
StringBuilder
();
msg
.
getProperties
()
.
forEach
((
k
,
v
)
->
builder
.
append
(
String
.
format
(
propertiesFormat
,
k
,
v
)).
append
(
", "
));
LOGGER
.
info
(
"Received message with messageId = {}, key = {}, value = {}, properties = {}"
,
msg
.
getMessageId
(),
msg
.
getKey
(),
new
String
(
msg
.
getValue
()),
builder
.
toString
());
.
forEach
((
k
,
v
)
->
builder
.
append
(
String
.
format
(
propertiesFormat
,
k
,
v
)).
append
(
", "
));
LOGGER
.
info
(
"Received message with messageId = {}, key = {}, value = {}, properties = {}"
,
msg
.
getMessageId
(),
msg
.
getKey
(),
new
String
(
msg
.
getValue
()),
builder
.
toString
());
}
consumer
.
acknowledge
(
msg
);
...
...
@@ -101,10 +126,10 @@ public class CaseController {
@ResponseBody
public
String
healthCheck
()
throws
InterruptedException
{
try
(
PulsarClient
pulsarClient
=
PulsarClient
.
builder
()
.
serviceUrl
(
PULSAR_DOMAIN
+
serviceUrl
)
.
build
();
Producer
<
byte
[]>
producer
=
pulsarClient
.
newProducer
()
.
topic
(
"healthCheck"
)
.
create
())
{
.
serviceUrl
(
PULSAR_DOMAIN
+
serviceUrl
)
.
build
();
Producer
<
byte
[]>
producer
=
pulsarClient
.
newProducer
()
.
topic
(
"healthCheck"
)
.
create
())
{
if
(
producer
.
isConnected
())
{
return
"Success"
;
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录