未验证 提交 5f26710d 编写于 作者: W wallezhang 提交者: GitHub

fix pulsar plugin message listener error with multi partitions topic (#6918)

上级 94adbf1c
......@@ -45,7 +45,6 @@ public class ConsumerConstructorInterceptor implements InstanceConstructorInterc
requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
requireInfo.setTopic(topic);
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
requireInfo.setHasMessageListener(consumerConfigurationData.getMessageListener() != null);
objInst.setSkyWalkingDynamicField(requireInfo);
}
}
......@@ -38,19 +38,6 @@ public class ConsumerEnhanceRequiredInfo {
*/
private String subscriptionName;
/**
* whether the consumer has a message listener
*/
private boolean hasMessageListener;
public boolean isHasMessageListener() {
return hasMessageListener;
}
public void setHasMessageListener(boolean hasMessageListener) {
this.hasMessageListener = hasMessageListener;
}
public String getServiceUrl() {
return serviceUrl;
}
......
......@@ -77,17 +77,15 @@ public class PulsarConsumerInterceptor implements InstanceMethodsAroundIntercept
if (allArguments[0] != null) {
final ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo) objInst
.getSkyWalkingDynamicField();
if (requiredInfo.isHasMessageListener()) {
EnhancedInstance msg = (EnhancedInstance) allArguments[0];
MessageEnhanceRequiredInfo messageEnhanceRequiredInfo = (MessageEnhanceRequiredInfo) msg
.getSkyWalkingDynamicField();
if (messageEnhanceRequiredInfo == null) {
messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
}
messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
EnhancedInstance msg = (EnhancedInstance) allArguments[0];
MessageEnhanceRequiredInfo messageEnhanceRequiredInfo = (MessageEnhanceRequiredInfo) msg
.getSkyWalkingDynamicField();
if (messageEnhanceRequiredInfo == null) {
messageEnhanceRequiredInfo = new MessageEnhanceRequiredInfo();
msg.setSkyWalkingDynamicField(messageEnhanceRequiredInfo);
}
messageEnhanceRequiredInfo.setTopic(requiredInfo.getTopic());
messageEnhanceRequiredInfo.setContextSnapshot(ContextManager.capture());
ContextManager.stopSpan();
}
return ret;
......
......@@ -58,7 +58,7 @@ public class PulsarConsumerListenerInterceptor implements InstanceMethodsAroundI
return ret == null ? null : (MessageListener) (consumer, message) -> {
final MessageEnhanceRequiredInfo requiredInfo = (MessageEnhanceRequiredInfo) ((EnhancedInstance) message)
.getSkyWalkingDynamicField();
if (requiredInfo == null) {
if (requiredInfo == null || requiredInfo.getContextSnapshot() == null) {
((MessageListener) ret).received(consumer, message);
} else {
AbstractSpan activeSpan = ContextManager
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* Interceptor of pulsar topic message constructor.
* <p>
* The interceptor create {@link MessageEnhanceRequiredInfo} which is required by passing message span across
* threads. Another purpose of this interceptor is to make {@link ClassEnhancePluginDefine} enable enhanced class
* to implement {@link EnhancedInstance} interface. Because if {@link ClassEnhancePluginDefine} class will not create
* SkyWalkingDynamicField without any constructor or method interceptor.
*/
public class TopicMessageConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
final Object msgArgument = allArguments[2];
if (msgArgument instanceof EnhancedInstance) {
objInst.setSkyWalkingDynamicField(((EnhancedInstance) msgArgument).getSkyWalkingDynamicField());
} else {
objInst.setSkyWalkingDynamicField(new MessageEnhanceRequiredInfo());
}
}
}
......@@ -26,7 +26,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Pulsar message instrumentation.
......@@ -39,7 +39,7 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.b
*/
public class MessageInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.api.Message";
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.MessageImpl";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.MessageConstructorInterceptor";
@Override
......@@ -66,6 +66,6 @@ public class MessageInstrumentation extends ClassInstanceMethodsEnhancePluginDef
@Override
protected ClassMatch enhanceClass() {
return byHierarchyMatch(ENHANCE_CLASS);
return byName(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.pulsar.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.BooleanMatcher;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* Pulsar topic message instrumentation.
* <p>
* The topic message enhanced object is only for passing message reception span across threads.
* <p>
* Enhanced message object will be injected {@link org.apache.skywalking.apm.plugin.pulsar.MessageEnhanceRequiredInfo}
* after message process method if consumer has a message listener.
* </p>
*/
public class TopicMessageInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.TopicMessageImpl";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.TopicMessageConstructorInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return new BooleanMatcher<>(true);
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
......@@ -18,4 +18,5 @@ pulsar=org.apache.skywalking.apm.plugin.pulsar.define.SendCallbackInstrumentatio
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarConsumerListenerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.PulsarProducerInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
\ No newline at end of file
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.MessageInstrumentation
pulsar=org.apache.skywalking.apm.plugin.pulsar.define.TopicMessageInstrumentation
\ No newline at end of file
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.plugin.pulsar;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
......@@ -71,13 +70,6 @@ public class ConsumerConstructorInterceptorTest {
when(lookupService.getServiceUrl()).thenReturn(SERVICE_URL);
when(pulsarClient.getLookup()).thenReturn(lookupService);
when(consumerConfigurationData.getSubscriptionName()).thenReturn(SUBSCRIPTION_NAME);
when(consumerConfigurationData.getMessageListener()).thenReturn((consumer, message) -> {
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
});
constructorInterceptor = new ConsumerConstructorInterceptor();
}
......@@ -92,6 +84,5 @@ public class ConsumerConstructorInterceptorTest {
assertThat(requiredInfo.getServiceUrl(), is(SERVICE_URL));
assertThat(requiredInfo.getTopic(), is(TOPIC_NAME));
assertThat(requiredInfo.getSubscriptionName(), is(SUBSCRIPTION_NAME));
assertThat(requiredInfo.isHasMessageListener(), is(true));
}
}
......@@ -122,7 +122,6 @@ public class PulsarConsumerInterceptorTest {
@Test
public void testConsumerWithMessageListener() throws Throwable {
consumerEnhanceRequiredInfo.setHasMessageListener(true);
consumerInterceptor.beforeMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
consumerInterceptor.afterMethod(consumerInstance, null, new Object[]{msg}, new Class[0], null);
......
......@@ -96,7 +96,6 @@ public class PulsarConsumerListenerInterceptorTest {
consumerEnhanceRequiredInfo.setTopic("persistent://my-tenant/my-ns/my-topic");
consumerEnhanceRequiredInfo.setServiceUrl("pulsar://localhost:6650");
consumerEnhanceRequiredInfo.setSubscriptionName("my-sub");
consumerEnhanceRequiredInfo.setHasMessageListener(true);
msg = new MockMessage();
msg.getMessageBuilder()
.addProperties(PulsarApi.KeyValue.newBuilder()
......
......@@ -15,7 +15,7 @@
# limitations under the License.
segmentItems:
- serviceName: pulsar-scenario
segmentSize: ge 6
segmentSize: ge 9
segments:
- segmentId: not null
spans:
......@@ -34,6 +34,21 @@ segmentItems:
- { key: mq.broker, value: not null }
- { key: mq.topic, value: test }
skipAnalysis: 'false'
- operationName: Pulsar/testMultiPartition/Producer
operationId: 0
parentSpanId: 0
spanId: 2
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 73
isError: false
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- { key: mq.broker, value: not null }
- { key: mq.topic, value: testMultiPartition }
- operationName: /case/pulsar-case
operationId: 0
parentSpanId: -1
......@@ -46,7 +61,7 @@ segmentItems:
spanType: Entry
peer: ''
tags:
- { key: url, value: 'http://localhost:8080/pulsar-scenario/case/pulsar-case' }
- { key: url, value: 'http://localhost:8088/pulsar-scenario/case/pulsar-case' }
- { key: http.method, value: GET }
skipAnalysis: 'false'
- segmentId: not null
......@@ -134,3 +149,88 @@ segmentItems:
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not null,
parentService: pulsar-scenario, traceId: not null }
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Pulsar/Producer/Callback
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 73
isError: false
spanType: Local
peer: ''
tags:
- { key: mq.topic, value: testMultiPartition }
refs:
- { parentEndpoint: /case/pulsar-case, networkAddress: '', refType: CrossThread,
parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: pulsar-scenario, traceId: not null }
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Pulsar/testMultiPartition/Consumer/test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 74
isError: false
spanType: Entry
peer: ''
tags:
- { key: transmission.latency, value: not null }
- { key: mq.broker, value: not null }
- { key: mq.topic, value: testMultiPartition }
refs:
- { parentEndpoint: /case/pulsar-case, networkAddress: not null, refType: CrossProcess,
parentSpanId: 2, parentTraceSegmentId: not null, parentServiceInstance: not
null, parentService: pulsar-scenario, traceId: not null }
skipAnalysis: 'false'
- segmentId: not null
spans:
- operationName: Pulsar/testMultiPartition/Consumer/testWithListener
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 74
isError: false
spanType: Entry
peer: ''
skipAnalysis: 'false'
tags:
- { key: transmission.latency, value: not null }
- { key: mq.broker, value: not null }
- { key: mq.topic, value: testMultiPartition }
refs:
- { parentEndpoint: /case/pulsar-case, networkAddress: not null,
refType: CrossProcess, parentSpanId: 2, parentTraceSegmentId: not null,
parentServiceInstance: not null, parentService: pulsar-scenario,
traceId: not null }
- segmentId: not null
spans:
- operationName: Pulsar/testMultiPartition/Consumer/MessageListener
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
startTime: nq 0
endTime: nq 0
componentId: 74
isError: false
spanType: Local
peer: ''
tags:
- { key: mq.topic, value: testMultiPartition }
refs:
- { parentEndpoint: Pulsar/testMultiPartition/Consumer/testWithListener, networkAddress: '', refType: CrossThread,
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not null,
parentService: pulsar-scenario, traceId: not null }
skipAnalysis: 'false'
......@@ -15,8 +15,8 @@
# limitations under the License.
type: jvm
entryService: http://localhost:8080/pulsar-scenario/case/pulsar-case
healthCheck: http://localhost:8080/pulsar-scenario/case/healthCheck
entryService: http://localhost:8088/pulsar-scenario/case/pulsar-case
healthCheck: http://localhost:8088/pulsar-scenario/case/healthCheck
startScript: ./bin/startup.sh
environment:
- PULSAR_STANDALONE=pulsar-standalone:6650
......@@ -29,3 +29,4 @@ dependencies:
startScript: ["bin/pulsar","standalone"]
expose:
- 6650
- 8080
......@@ -31,6 +31,11 @@ import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.io.IOException;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
......@@ -48,10 +53,24 @@ public class CaseController {
@RequestMapping("/pulsar-case")
@ResponseBody
public String pulsarCase() throws PulsarClientException, InterruptedException {
public String pulsarCase() {
String topic = "test";
String topicOnePartition = "test";
String topicMultiPartition = "testMultiPartition";
try {
doSendAndReceiveMessage(topicOnePartition);
createTopic(topicMultiPartition, 2);
doSendAndReceiveMessage(topicMultiPartition);
} catch (IOException e) {
LOGGER.error("test error", e);
}
return "Success";
}
private void doSendAndReceiveMessage(String topic) throws PulsarClientException {
CountDownLatch latch = new CountDownLatch(2);
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(PULSAR_DOMAIN + serviceUrl).build();
......@@ -60,7 +79,7 @@ public class CaseController {
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("test").subscribe();
pulsarClient.newConsumer().topic(topic)
final Consumer<byte[]> consumerWithListener = pulsarClient.newConsumer().topic(topic)
.subscriptionName("testWithListener")
.messageListener((c, msg) -> {
try {
......@@ -70,7 +89,8 @@ public class CaseController {
msg.getProperties()
.forEach((k, v) -> builder.append(String.format(propertiesFormat, k, v))
.append(", "));
LOGGER.info("Received message with messageId = {}, key = {}, value = {}, properties = {}",
LOGGER.info(
"Received message with messageId = {}, key = {}, value = {}, properties = {}",
msg.getMessageId(), msg
.getKey(), new String(msg.getValue()), builder.toString());
......@@ -113,13 +133,24 @@ public class CaseController {
} catch (InterruptedException e) {
LOGGER.error("Can get message from consumer", e);
t.interrupt();
throw e;
}
producer.close();
consumer.close();
consumerWithListener.close();
}
return "Success";
private void createTopic(String topic, int numOfPartitions) throws IOException {
final URL url = new URL("http://pulsar-standalone:8080/admin/v2/persistent/public/default/" + topic + "/partitions");
final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("PUT");
connection.setDoOutput(true);
final OutputStream outputStream = connection.getOutputStream();
outputStream.write(String.valueOf(numOfPartitions).getBytes(StandardCharsets.UTF_8));
outputStream.flush();
LOGGER.info("Create topic result:{}", connection.getResponseCode());
}
@RequestMapping("/healthCheck")
......
......@@ -15,5 +15,5 @@
# limitations under the License.
#
#
server.port=8080
server.port=8088
server.contextPath=/pulsar-scenario
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册