提交 45f44544 编写于 作者: X XiaoFu 提交者: wu-sheng

Support rabbitmq plugin (#2000)

* Fix support rabbitmq plugin

* fix Unit test rename sw3 to sw6

* Update pom.xml

* fix adjust rabbitmq Unit test

* fix add rabbitmq component in docker component-libraries.yml

* Update RabbitMQProducerInterceptor.java

* Update RabbitMQProducerInterceptor.java
上级 f8d384ee
......@@ -100,6 +100,10 @@ public class ComponentsDefine {
public static final OfficialComponent UNDERTOW = new OfficialComponent(49, "Undertow");
public static final OfficialComponent RABBITMQ_PRODUCER = new OfficialComponent(52,"rabbitmq-producer");
public static final OfficialComponent RABBITMQ_CONSUMER = new OfficialComponent(53,"rabbitmq-consumer");
private static ComponentsDefine INSTANCE = new ComponentsDefine();
private String[] components;
......@@ -109,7 +113,7 @@ public class ComponentsDefine {
}
public ComponentsDefine() {
components = new String[50];
components = new String[54];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
......@@ -146,6 +150,8 @@ public class ComponentsDefine {
addComponent(ACTIVEMQ_PRODUCER);
addComponent(ACTIVEMQ_CONSUMER);
addComponent(UNDERTOW);
addComponent(RABBITMQ_PRODUCER);
addComponent(RABBITMQ_CONSUMER);
}
private void addComponent(OfficialComponent component) {
......
......@@ -60,6 +60,7 @@
<module>activemq-5.x-plugin</module>
<module>elasticsearch-5.x-plugin</module>
<module>undertow-plugins</module>
<module>rabbitmq-5.x-plugin</module>
</modules>
<packaging>pom</packaging>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-beta-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-rabbitmq-5.x-plugin</artifactId>
<name>rabbitmq-5.x-plugin</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<rabbitmq-client.version>5.2.0</rabbitmq-client.version>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq-client.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
String url = (String) objInst.getSkyWalkingDynamicField();
Envelope envelope = (Envelope) allArguments[2];
AMQP.BasicProperties properties = (AMQP.BasicProperties) allArguments[3];
AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/" + envelope.getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis());
Tags.MQ_BROKER.set(activeSpan,url);
Tags.MQ_TOPIC.set(activeSpan,envelope.getExchange());
Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
SpanLayer.asMQ(activeSpan);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString());
}
ContextManager.extract(contextCarrier);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.Connection;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
public class RabbitMQProducerAndConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
Connection connection = (Connection)allArguments[0];
String url = connection.getAddress().toString().replace("/","") + ":" + connection.getPort();
objInst.setSkyWalkingDynamicField(url);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.AMQP;
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
public class RabbitMQProducerInterceptor implements InstanceMethodsAroundInterceptor {
public static final String OPERATE_NAME_PREFIX = "RabbitMQ/";
public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ContextCarrier contextCarrier = new ContextCarrier();
AMQP.BasicProperties properties = (AMQP.BasicProperties)allArguments[4];
AMQP.BasicProperties.Builder propertiesBuilder;
Map<String, Object> headers = new HashMap<String, Object>();
if (properties != null) {
propertiesBuilder = properties.builder().appId(properties.getAppId())
.clusterId(properties.getClusterId())
.contentEncoding(properties.getContentEncoding())
.contentType(properties.getContentType())
.correlationId(properties.getCorrelationId())
.deliveryMode(properties.getDeliveryMode())
.expiration(properties.getExpiration())
.messageId(properties.getMessageId())
.priority(properties.getPriority())
.replyTo(properties.getReplyTo())
.timestamp(properties.getTimestamp())
.type(properties.getType())
.userId(properties.getUserId());
// copy origin headers
if (properties.getHeaders() != null) {
headers.putAll(properties.getHeaders());
}
} else {
propertiesBuilder = new AMQP.BasicProperties.Builder();
}
String exChangeName = (String)allArguments[0];
String queueName = (String)allArguments[1];
String url = (String)objInst.getSkyWalkingDynamicField();
AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + "Topic/" + exChangeName + "Queue/" + queueName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, url);
Tags.MQ_BROKER.set(activeSpan, url);
Tags.MQ_QUEUE.set(activeSpan, queueName);
Tags.MQ_TOPIC.set(activeSpan, exChangeName);
SpanLayer.asMQ(activeSpan);
activeSpan.setComponent(ComponentsDefine.RABBITMQ_PRODUCER);
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
headers.put(next.getHeadKey(), next.getHeadValue());
}
allArguments[4] = propertiesBuilder.headers(headers).build();
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
public class RabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor";
public static final String ENHANCE_CLASS_PRODUCER = "com.rabbitmq.client.impl.ConsumerDispatcher";
public static final String ENHANCE_METHOD_DISPATCH = "handleDelivery";
public static final String INTERCEPTOR_CONSTRUCTOR = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(0,"com.rabbitmq.client.impl.AMQConnection");
}
@Override public String getConstructorInterceptor() {
return INTERCEPTOR_CONSTRUCTOR;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(3,"com.rabbitmq.client.AMQP$BasicProperties"));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return true;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_PRODUCER);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
public class RabbitMQProducerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerInterceptor";
public static final String ENHANCE_CLASS_PRODUCER = "com.rabbitmq.client.impl.ChannelN";
public static final String ENHANCE_METHOD_DISPATCH = "basicPublish";
public static final String INTERCEPTOR_CONSTRUCTOR = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgumentWithType(3,"com.rabbitmq.client.MetricsCollector");
}
@Override public String getConstructorInterceptor() {
return INTERCEPTOR_CONSTRUCTOR;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(4,"com.rabbitmq.client.AMQP$BasicProperties"));
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return true;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_PRODUCER);
}
}
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQProducerInstrumentation
rabbitmq-5.x=org.apache.skywalking.apm.plugin.rabbitmq.define.RabbitMQConsumerInstrumentation
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class RabbitMQConsumerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
@Override
public Object getSkyWalkingDynamicField() {
return "127.0.0.1:5272";
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
};
private RabbitMQConsumerInterceptor rabbitMQConsumerInterceptor;
private Object[] arguments;
@Before
public void setUp() throws Exception {
rabbitMQConsumerInterceptor = new RabbitMQConsumerInterceptor();
Envelope envelope = new Envelope(1111,false,"","rabbitmq-test");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("sw6","1-MS4xLjE1NDM5NzU1OTEwMTQwMDAx-MS4xLjE1NDM5NzU1OTA5OTcwMDAw-0-1-1-IzEyNy4wLjAuMTo1Mjcy-I1JhYmJpdE1RL1RvcGljL1F1ZXVlL3JhYmJpdG1xLXRlc3QvUHJvZHVjZXI=-I1JhYmJpdE1RL1RvcGljL1F1ZXVlL3JhYmJpdG1xLXRlc3QvUHJvZHVjZXI=");
AMQP.BasicProperties.Builder propsBuilder = new AMQP.BasicProperties.Builder();
arguments = new Object[] {0,0,envelope,propsBuilder.headers(headers).build()};
}
@Test
public void TestRabbitMQConsumerInterceptor() throws Throwable {
rabbitMQConsumerInterceptor.beforeMethod(enhancedInstance,null,arguments,null,null);
rabbitMQConsumerInterceptor.afterMethod(enhancedInstance,null,arguments,null,null);
List<TraceSegment> traceSegments = segmentStorage.getTraceSegments();
Assert.assertThat(traceSegments.size(), is(1));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import com.rabbitmq.client.*;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class RabbitMQProducerAndConsumerConstructorInterceptorTest {
private RabbitMQProducerAndConsumerConstructorInterceptor rabbitMQProducerAndConsumerConstructorInterceptor;
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
private String test;
@Override
public Object getSkyWalkingDynamicField() {
return test;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
test = (String)value;
}
};
public class TestConnection implements Connection {
@Override
public InetAddress getAddress() {
try {
return InetAddress.getByName("127.0.0.1");
} catch (UnknownHostException e) {
e.printStackTrace();
return null;
}
}
@Override
public int getPort() {
return 5672;
}
@Override
public int getChannelMax() {
return 0;
}
@Override
public int getFrameMax() {
return 0;
}
@Override
public int getHeartbeat() {
return 0;
}
@Override
public Map<String, Object> getClientProperties() {
return null;
}
@Override
public String getClientProvidedName() {
return null;
}
@Override
public Map<String, Object> getServerProperties() {
return null;
}
@Override
public Channel createChannel() throws IOException {
return null;
}
@Override
public Channel createChannel(int i) throws IOException {
return null;
}
@Override
public void close() throws IOException {
}
@Override
public void close(int i, String s) throws IOException {
}
@Override
public void close(int i) throws IOException {
}
@Override
public void close(int i, String s, int i1) throws IOException {
}
@Override
public void abort() {
}
@Override
public void abort(int i, String s) {
}
@Override
public void abort(int i) {
}
@Override
public void abort(int i, String s, int i1) {
}
@Override
public void addBlockedListener(BlockedListener blockedListener) {
}
@Override
public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) {
return null;
}
@Override
public boolean removeBlockedListener(BlockedListener blockedListener) {
return false;
}
@Override
public void clearBlockedListeners() {
}
@Override
public ExceptionHandler getExceptionHandler() {
return null;
}
@Override
public String getId() {
return null;
}
@Override
public void setId(String s) {
}
@Override
public void addShutdownListener(ShutdownListener shutdownListener) {
}
@Override
public void removeShutdownListener(ShutdownListener shutdownListener) {
}
@Override
public ShutdownSignalException getCloseReason() {
return null;
}
@Override
public void notifyListeners() {
}
@Override
public boolean isOpen() {
return false;
}
}
private Connection testConnection;
@Before
public void setUp() throws Exception {
testConnection = new TestConnection();
}
@Test
public void TestRabbitMQConsumerAndProducerConstructorInterceptor() {
rabbitMQProducerAndConsumerConstructorInterceptor = new RabbitMQProducerAndConsumerConstructorInterceptor();
rabbitMQProducerAndConsumerConstructorInterceptor.onConstruct(enhancedInstance,new Object[] {testConnection});
assertThat((String) enhancedInstance.getSkyWalkingDynamicField(), is("127.0.0.1:5672"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.rabbitmq;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.*;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.util.List;
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.RABBITMQ_PRODUCER;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class RabbitMQProducerInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private EnhancedInstance enhancedInstance = new EnhancedInstance() {
@Override
public Object getSkyWalkingDynamicField() {
return "127.0.0.1:5272";
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
};
private RabbitMQProducerInterceptor rabbitMQProducerInterceptor;
private Object[] arguments;
@Before
public void setUp() throws Exception {
rabbitMQProducerInterceptor = new RabbitMQProducerInterceptor();
arguments = new Object[] {"","rabbitmq-test",0,0,null};
}
@Test
public void TestRabbitMQProducerInterceptor() throws Throwable {
rabbitMQProducerInterceptor.beforeMethod(enhancedInstance,null,arguments,null,null);
rabbitMQProducerInterceptor.afterMethod(enhancedInstance,null,arguments,null,null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(1));
TraceSegment segment = traceSegmentList.get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertThat(spans.size(), is(1));
assertRabbitMQSpan(spans.get(0));
}
private void assertRabbitMQSpan(AbstractTracingSpan span) {
SpanAssert.assertTag(span, 0, "127.0.0.1:5272");
SpanAssert.assertTag(span, 1, "rabbitmq-test");
SpanAssert.assertComponent(span, RABBITMQ_PRODUCER);
SpanAssert.assertLayer(span, SpanLayer.MQ);
assertThat(span.getOperationName(), is("RabbitMQ/Topic/Queue/rabbitmq-test/Producer"));
}
}
......@@ -177,6 +177,15 @@ http:
rpc:
id: 50
languages: Java,C#,Node.js
RabbitMQ:
id: 51
languages: Java
rabbitmq-producer:
id: 52
languages: Java
rabbitmq-consumer:
id: 53
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......@@ -252,6 +261,8 @@ Component-Server-Mappings:
kafka-consumer: Kafka
activemq-producer: ActiveMQ
activemq-consumer: ActiveMQ
rabbitmq-producer: RabbitMQ
rabbitmq-consumer: RabbitMQ
postgresql-jdbc-driver: PostgreSQL
Xmemcached: Memcached
Spymemcached: Memcached
......
......@@ -177,6 +177,15 @@ http:
rpc:
id: 50
languages: Java,C#,Node.js
RabbitMQ:
id: 51
languages: Java
rabbitmq-producer:
id: 52
languages: Java
rabbitmq-consumer:
id: 53
languages: Java
# .NET/.NET Core components
# [3000, 4000) for C#/.NET only
......@@ -252,6 +261,8 @@ Component-Server-Mappings:
kafka-consumer: Kafka
activemq-producer: ActiveMQ
activemq-consumer: ActiveMQ
rabbitmq-producer: RabbitMQ
rabbitmq-consumer: RabbitMQ
postgresql-jdbc-driver: PostgreSQL
Xmemcached: Memcached
Spymemcached: Memcached
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册