提交 c15d1a56 编写于 作者: G gaohongtao

Fix #603 Elastic-Job plugin

上级 df97dc34
......@@ -71,6 +71,8 @@ public class ComponentsDefine {
public static final OfficialComponent GRPC = new OfficialComponent(23, "GRPC");
public static final OfficialComponent ELASTIC_JOB = new OfficialComponent(24, "ElasticJob");
private static ComponentsDefine instance = new ComponentsDefine();
private String[] components;
......@@ -80,7 +82,7 @@ public class ComponentsDefine {
}
public ComponentsDefine() {
components = new String[24];
components = new String[25];
addComponent(TOMCAT);
addComponent(HTTPCLIENT);
addComponent(DUBBO);
......@@ -104,6 +106,7 @@ public class ComponentsDefine {
addComponent(SHARDING_JDBC);
addComponent(POSTGRESQL);
addComponent(GRPC);
addComponent(ELASTIC_JOB);
}
private void addComponent(OfficialComponent component) {
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-sdk-plugin</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.4-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-elastic-job-2.x-plugin</artifactId>
<packaging>jar</packaging>
<name>elastic-job-2.x-plugin</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-common-core</artifactId>
<version>[2.0.0,3.0.0)</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
</plugin>
<plugin>
<!-- 源码插件 -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<!-- 发布时自动将源码同时发布的配置 -->
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.plugin.esjob;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import com.google.common.base.Strings;
import java.lang.reflect.Method;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import org.skywalking.apm.network.trace.component.ComponentsDefine;
/**
* {@link JobExecutorInterceptor} enhances {@link com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor#process(ShardingContext)}
* ,creating a local span that records job execution.
*
* @author gaohongtao
*/
public class JobExecutorInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
ShardingContexts shardingContexts = (ShardingContexts)allArguments[0];
Integer item = (Integer)allArguments[1];
ShardingContext shardingContext = new ShardingContext(shardingContexts, item);
String operateName = shardingContext.getJobName();
if (!Strings.isNullOrEmpty(shardingContext.getShardingParameter())) {
operateName += "-" + shardingContext.getShardingParameter();
}
AbstractSpan span = ContextManager.createLocalSpan(operateName);
span.setComponent(ComponentsDefine.ELASTIC_JOB);
span.tag("sharding_context", shardingContext.toString());
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
ContextManager.stopSpan();
return ret;
}
@Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
ContextManager.activeSpan().errorOccurred().log(t);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.plugin.esjob.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.skywalking.apm.agent.core.plugin.match.ClassMatch;
import static org.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
/**
* {@link JobExecutorInstrumentation} presents that skywalking intercepts {@link com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor}.
*
* @author gaohongtao
*/
public class JobExecutorInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor";
private static final String JOB_EXECUTOR_INTERCEPTOR_CLASS = "org.skywalking.apm.plugin.esjob.JobExecutorInterceptor";
@Override protected ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override protected InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return takesArgumentWithType(2, "com.dangdang.ddframe.job.event.type.JobExecutionEvent");
}
@Override public String getMethodsInterceptor() {
return JOB_EXECUTOR_INTERCEPTOR_CLASS;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
@Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
}
elastic-job-2.x=org.skywalking.apm.plugin.esjob.define.JobExecutorInstrumentation
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.plugin.esjob;
import com.dangdang.ddframe.job.executor.ShardingContexts;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;
import org.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
import org.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.skywalking.apm.agent.test.helper.SegmentHelper;
import org.skywalking.apm.agent.test.tools.AgentServiceRule;
import org.skywalking.apm.agent.test.tools.SegmentStorage;
import org.skywalking.apm.agent.test.tools.SegmentStoragePoint;
import org.skywalking.apm.agent.test.tools.TracingSegmentRunner;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(TracingSegmentRunner.class)
public class JobExecutorInterceptorTest {
@SegmentStoragePoint
private SegmentStorage segmentStorage;
@Rule
public AgentServiceRule serviceRule = new AgentServiceRule();
private JobExecutorInterceptor jobExecutorInterceptor;
@Before
public void setUp() throws SQLException {
jobExecutorInterceptor = new JobExecutorInterceptor();
}
@Test
public void assertSuccess() throws Throwable {
jobExecutorInterceptor.beforeMethod(null, null, new Object[]{mockShardingContext("fooJob", 1), 1}, null, null);
jobExecutorInterceptor.afterMethod(null, null, null, null, null);
TraceSegment segment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertNotNull(spans);
assertThat(spans.size(), is(1));
assertThat(spans.get(0).transform().getOperationName(), is("fooJob-test"));
assertThat(spans.get(0).transform().getComponentId(), is(23));
assertThat(spans.get(0).transform().getTags(0).getKey(), is("sharding_context"));
assertThat(spans.get(0).transform().getTags(0).getValue(), is("ShardingContext(jobName=fooJob, taskId=fooJob1, shardingTotalCount=2, jobParameter=, shardingItem=1, shardingParameter=test)"));
}
@Test
public void assertSuccessWithoutSharding() throws Throwable {
jobExecutorInterceptor.beforeMethod(null, null, new Object[]{mockShardingContext("fooJob", 0), 0}, null, null);
jobExecutorInterceptor.afterMethod(null, null, null, null, null);
TraceSegment segment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertNotNull(spans);
assertThat(spans.size(), is(1));
assertThat(spans.get(0).transform().getOperationName(), is("fooJob"));
assertThat(spans.get(0).transform().getTags(0).getValue(), is("ShardingContext(jobName=fooJob, taskId=fooJob0, shardingTotalCount=1, jobParameter=, shardingItem=0, shardingParameter=null)"));
}
@Test
public void assertError() throws Throwable {
jobExecutorInterceptor.beforeMethod(null, null, new Object[]{mockShardingContext("fooJob", 0), 0}, null, null);
jobExecutorInterceptor.handleMethodException(null, null, null, null, new Exception("fooError"));
jobExecutorInterceptor.afterMethod(null, null, null, null, null);
TraceSegment segment = segmentStorage.getTraceSegments().get(0);
List<AbstractTracingSpan> spans = SegmentHelper.getSpans(segment);
assertNotNull(spans);
assertThat(spans.size(), is(1));
assertThat(spans.get(0).transform().getIsError(), is(true));
assertThat(spans.get(0).transform().getLogs(0).getDataList().size(), is(4));
}
private ShardingContexts mockShardingContext(String jobName, int shardingItem) {
Map<Integer, String> shardingMap = new HashMap<Integer, String>(1);
if (shardingItem >= 1) {
shardingMap.put(shardingItem, "test");
}
return new ShardingContexts(jobName + shardingItem, jobName, shardingItem + 1, "", shardingMap);
}
}
\ No newline at end of file
......@@ -52,6 +52,7 @@
<module>h2-1.x-plugin</module>
<module>postgresql-8.x-plugin</module>
<module>oracle-10.x-plugin</module>
<module>elastic-job-2.x-plugin</module>
</modules>
<packaging>pom</packaging>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册