提交 addf5101 编写于 作者: X XiaoFu 提交者: Xin,Zhang

Support canal plugin (#2035)

* fix add Canal plugin

* fix add UnitTest

* fix   format pom.xml

* Delete CanalApplication.java

* fix support cluster url

* fix add Licensed

* fix bug

* fix
上级 4d8bd155
...@@ -104,6 +104,8 @@ public class ComponentsDefine { ...@@ -104,6 +104,8 @@ public class ComponentsDefine {
public static final OfficialComponent RABBITMQ_CONSUMER = new OfficialComponent(53,"rabbitmq-consumer"); public static final OfficialComponent RABBITMQ_CONSUMER = new OfficialComponent(53,"rabbitmq-consumer");
public static final OfficialComponent CANAL = new OfficialComponent(54,"Canal");
private static ComponentsDefine INSTANCE = new ComponentsDefine(); private static ComponentsDefine INSTANCE = new ComponentsDefine();
private String[] components; private String[] components;
...@@ -113,7 +115,7 @@ public class ComponentsDefine { ...@@ -113,7 +115,7 @@ public class ComponentsDefine {
} }
public ComponentsDefine() { public ComponentsDefine() {
components = new String[54]; components = new String[55];
addComponent(TOMCAT); addComponent(TOMCAT);
addComponent(HTTPCLIENT); addComponent(HTTPCLIENT);
addComponent(DUBBO); addComponent(DUBBO);
...@@ -152,6 +154,7 @@ public class ComponentsDefine { ...@@ -152,6 +154,7 @@ public class ComponentsDefine {
addComponent(UNDERTOW); addComponent(UNDERTOW);
addComponent(RABBITMQ_PRODUCER); addComponent(RABBITMQ_PRODUCER);
addComponent(RABBITMQ_CONSUMER); addComponent(RABBITMQ_CONSUMER);
addComponent(CANAL);
} }
private void addComponent(OfficialComponent component) { private void addComponent(OfficialComponent component) {
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.0.0-beta-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-canal-1.x-plugin</artifactId>
<name>canal-1.x-plugin</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<canal-client.version>1.1.2</canal-client.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>${canal-client.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal;
import java.net.InetSocketAddress;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
/**
* @author withlin
*/
public class CanalConstructorInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
InetSocketAddress address = (InetSocketAddress) allArguments[0];
String destination = allArguments[3].toString();
CanalEnhanceInfo canalEnhanceInfo = new CanalEnhanceInfo();
if (address != null) {
String url = address.getAddress().toString() + ":" + address.getPort();
canalEnhanceInfo.setUrl(url.replace('/',' '));
}
canalEnhanceInfo.setDestination(destination);
objInst.setSkyWalkingDynamicField(canalEnhanceInfo);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal;
/**
* @author withlin
*/
public class CanalEnhanceInfo {
public String getUrl() {
return url;
}
public CanalEnhanceInfo setUrl(String url) {
this.url = url;
return this;
}
public String getDestination() {
return destination;
}
public CanalEnhanceInfo setDestination(String destination) {
this.destination = destination;
return this;
}
/**
* canal-server address
*/
private String url;
/**
* canal destination
*/
private String destination;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.List;
/**
* @author withlin
*/
public class CanalInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
CanalEnhanceInfo canalEnhanceInfo = (CanalEnhanceInfo)objInst.getSkyWalkingDynamicField();
SimpleCanalConnector connector = (SimpleCanalConnector) objInst;
String url = canalEnhanceInfo.getUrl();
if (url == "" || url == null) {
InetSocketAddress address = (InetSocketAddress)connector.getNextAddress();
String runningAddress = address.getAddress().toString() + ":" + address.getPort();
runningAddress = runningAddress.replace('/',' ');
url = runningAddress;
List<InetSocketAddress> socketAddressList = (List<InetSocketAddress>)ContextManager.getRuntimeContext().get("currentAddress");
if (socketAddressList != null && socketAddressList.size() > 0) {
for (InetSocketAddress socketAddress : socketAddressList) {
String currentAddress = socketAddress.getAddress().toString() + ":" + socketAddress.getPort();
currentAddress = currentAddress.replace('/',' ');
if (!currentAddress.equals(runningAddress)) {
url = url + "," + currentAddress;
}
}
}
}
String batchSize = allArguments[0].toString();
String destination = canalEnhanceInfo.getDestination();
AbstractSpan activeSpan = ContextManager.createExitSpan("Canal/" + destination,url).start(System.currentTimeMillis());
activeSpan.setComponent(ComponentsDefine.CANAL);
activeSpan.tag("batchSize",batchSize);
activeSpan.tag("destination",destination);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method,
Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal;
import com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy;
import com.alibaba.otter.canal.common.zookeeper.ZkClientx;
import com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* @author withlin
*/
public class ClusterNodeConstructInterceptor implements InstanceConstructorInterceptor {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
String clusterPath = ZookeeperPathUtils.getDestinationClusterRoot(allArguments[0].toString());
ZkClientx zkClientx = ((ClusterNodeAccessStrategy) objInst).getZkClient();
ContextManager.getRuntimeContext().put("currentAddress",getCurrentAddress(zkClientx.getChildren(clusterPath)));
}
private List<InetSocketAddress> getCurrentAddress(List<String> currentChilds) {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (String address : currentChilds) {
String[] strs = StringUtils.split(address, ":");
if (strs != null && strs.length == 2) {
addresses.add(new InetSocketAddress(strs[0], Integer.valueOf(strs[1])));
}
}
return addresses;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal.define;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
/**
* @author withlin
*/
public class CanalInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.canal.CanalInterceptor";
public static final String ENHANCE_CLASS = "com.alibaba.otter.canal.client.impl.SimpleCanalConnector";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.canal.CanalConstructorInterceptor";
public static final String ENHANCE_METHOD_DISPATCH = "getWithoutAck";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return takesArgument(4, int.class);
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD_DISPATCH);
}
@Override public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
import org.apache.skywalking.apm.agent.core.plugin.match.MultiClassNameMatch;
import static net.bytebuddy.matcher.ElementMatchers.any;
/**
* @author withlin
*/
public class ClusterNodeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public static final String ENHANCE_CLASS = "com.alibaba.otter.canal.client.impl.ClusterNodeAccessStrategy";
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.canal.ClusterNodeConstructInterceptor";
@Override
protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[] {
new ConstructorInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPTOR_CLASS;
}
}
};
}
@Override
protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
@Override
protected ClassMatch enhanceClass() {
return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS);
}
}
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
canal-1.x=org.apache.skywalking.apm.plugin.canal.define.CanalInstrumentation
canal-1.x=org.apache.skywalking.apm.plugin.canal.define.ClusterNodeInstrumentation
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.plugin.canal;
import com.alibaba.otter.canal.client.impl.SimpleCanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class CanalInterceptorTest {
private CanalInterceptor canalInterceptor;
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private Object[] arguments;
private Class[] argumentType;
private class CanalConnector extends SimpleCanalConnector implements EnhancedInstance {
public CanalConnector(SocketAddress address, String username, String password, String destination) {
super(address, username, password, destination);
}
@Override
public Object getSkyWalkingDynamicField() {
return null;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
}
private EnhancedInstance enhancedInstance = new CanalConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", "") {
@Override
public Object getSkyWalkingDynamicField() {
CanalEnhanceInfo canalEnhanceInfo = new CanalEnhanceInfo();
canalEnhanceInfo.setUrl("localhost:11111");
canalEnhanceInfo.setDestination("example");
return canalEnhanceInfo;
}
@Override
public void setSkyWalkingDynamicField(Object value) {
}
};
@Before
public void setUp() {
canalInterceptor = new CanalInterceptor();
arguments = new Object[] {100};
}
@Test
public void testSendMessage() throws Throwable {
canalInterceptor.beforeMethod(enhancedInstance, null, arguments, null, null);
canalInterceptor.afterMethod(enhancedInstance, null, arguments, null, null);
List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
assertThat(traceSegmentList.size(), is(1));
TraceSegment segment = traceSegmentList.get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertThat(spans.size(), is(1));
}
}
...@@ -60,7 +60,8 @@ ...@@ -60,7 +60,8 @@
<module>activemq-5.x-plugin</module> <module>activemq-5.x-plugin</module>
<module>elasticsearch-5.x-plugin</module> <module>elasticsearch-5.x-plugin</module>
<module>undertow-plugins</module> <module>undertow-plugins</module>
<module>rabbitmq-5.x-plugin</module> <module>rabbitmq-5.x-plugin</module>
<module>canal-1.x-plugin</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>
......
...@@ -186,6 +186,9 @@ rabbitmq-producer: ...@@ -186,6 +186,9 @@ rabbitmq-producer:
rabbitmq-consumer: rabbitmq-consumer:
id: 53 id: 53
languages: Java languages: Java
Canal:
id: 54
languages: Java
# .NET/.NET Core components # .NET/.NET Core components
# [3000, 4000) for C#/.NET only # [3000, 4000) for C#/.NET only
......
...@@ -186,6 +186,10 @@ rabbitmq-producer: ...@@ -186,6 +186,10 @@ rabbitmq-producer:
rabbitmq-consumer: rabbitmq-consumer:
id: 53 id: 53
languages: Java languages: Java
Canal:
id: 54
languages: Java
# .NET/.NET Core components # .NET/.NET Core components
# [3000, 4000) for C#/.NET only # [3000, 4000) for C#/.NET only
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册