提交 049dc2fa 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Support collector service instrumentation (#730)

* Collector instrument agent.

* Make the agent.jar output to the /agent folder in collector package.

* Try to add metric annotation.

* Change the instrument ways.

* Reformat.

* Finish the agent codes. Wait for @peng-yongsheng 's metric requirements.

* Debug new instrument.

* Fix a detect bug.

* Finish the instrument

* Fix compile issue.
上级 4390d8fd
......@@ -36,6 +36,7 @@ import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.pars
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -67,6 +68,7 @@ public class SegmentParse {
this.spanListeners = new LinkedList<>();
}
@GraphComputingMetric(name = "/segment/parse")
public boolean parse(UpstreamSegment segment, ISegmentParseService.Source source) {
createSpanListeners();
......@@ -95,6 +97,7 @@ public class SegmentParse {
return false;
}
@GraphComputingMetric(name = "/segment/parse/preBuild")
private boolean preBuild(List<UniqueId> traceIds, SegmentDecorator segmentDecorator) {
StringBuilder segmentIdBuilder = new StringBuilder();
......@@ -161,6 +164,7 @@ public class SegmentParse {
return true;
}
@GraphComputingMetric(name = "/segment/parse/buildSegment")
private void buildSegment(String id, byte[] dataBinary) {
Segment segment = new Segment();
segment.setId(id);
......@@ -170,6 +174,7 @@ public class SegmentParse {
graph.start(segment);
}
@GraphComputingMetric(name = "/segment/parse/bufferFile/write")
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("push to segment buffer write worker, id: {}", id);
SegmentStandardization standardization = new SegmentStandardization(id);
......
......@@ -20,6 +20,8 @@ package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.data.QueueData;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
......@@ -45,7 +47,9 @@ public class LocalAsyncWorkerRef<INPUT extends QueueData, OUTPUT extends QueueDa
this.dataCarrier = dataCarrier;
}
@Override public void consume(List<INPUT> data) {
@GraphComputingMetric(name = "/worker/async/consume")
@Override
public void consume(@BatchParameter List<INPUT> data) {
Iterator<INPUT> inputIterator = data.iterator();
int i = 0;
......
......@@ -175,6 +175,14 @@
<version>${project.version}</version>
</dependency>
<!-- alarm provider -->
<!-- instrument provided dependency -->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-instrument</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
......
......@@ -53,5 +53,12 @@
</includes>
<outputDirectory>/config</outputDirectory>
</fileSet>
<fileSet>
<directory>${project.basedir}/../apm-collector-instrument/target</directory>
<includes>
<include>collector-instrument-agent.jar</include>
</includes>
<outputDirectory>/agent</outputDirectory>
</fileSet>
</fileSets>
</assembly>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.annotations.trace;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author wusheng
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface BatchParameter {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.annotations.trace;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The method with this annotation should be traced,
* and the metrics(avg response time, call count, success rate) could be collected by the instrument agent.
*
* This is an optional annotation.
*
* @author wusheng
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface GraphComputingMetric {
String name();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.annotations.trace;
import java.util.List;
/**
* This is an example about how to use the tracing annotation in collector.
* These annotations effect only in `-instrument` mode active.
*
* @author wusheng
*/
public class TracedGraphElement {
@GraphComputingMetric(name = "/traced/element/run")
public void run() {
}
@GraphComputingMetric(name = "/traced/element/runWithBatch")
public void runWithBatch(@BatchParameter List<Object> data) {
}
}
......@@ -32,6 +32,7 @@
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.bytebuddy</groupId>
......@@ -39,4 +40,44 @@
<version>1.7.8</version>
</dependency>
</dependencies>
<build>
<finalName>collector-instrument-agent</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>true</createDependencyReducedPom>
<createSourcesJar>true</createSourcesJar>
<shadeSourcesContent>true</shadeSourcesContent>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Premain-Class>org.apache.skywalking.apm.collector.instrument.CollectorInstrumentAgent</Premain-Class>
</manifestEntries>
</transformer>
</transformers>
<artifactSet>
<excludes>
<exclude>com.*:*</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>org.apache.logging.log4j:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -16,58 +16,75 @@
*
*/
package org.apache.skywalking.apm.collector.instrument;
import net.bytebuddy.ByteBuddy;
import java.lang.instrument.Instrumentation;
import net.bytebuddy.agent.builder.AgentBuilder;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.skywalking.apm.collector.core.module.Service;
import net.bytebuddy.utility.JavaModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.declaresMethod;
import static net.bytebuddy.matcher.ElementMatchers.isAnnotatedWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
/**
* The <code>ServiceInstrumentation</code> create the dynamic service implementations based on the provider
* implementation. So the new implementation will report performance metric to {@link MetricCollector}.
* There are a lot of monitoring requirements in collector side.
* The agent way is easy, pluggable, and match the target services/graph-nodes automatically.
*
* This agent is designed and expected running in the same class loader of the collector application,
* so I will keep all class loader issue out of concern,
* in order to keep the trace and monitor codes as simple as possible.
*
* @author wu-sheng
*/
public enum ServiceInstrumentation {
INSTANCE;
public class CollectorInstrumentAgent {
private final static Logger logger = LoggerFactory.getLogger(CollectorInstrumentAgent.class);
public static void premain(String agentArgs, Instrumentation instrumentation) {
new AgentBuilder.Default().type(
declaresMethod(isAnnotationedMatch())
).transform((builder, typeDescription, classLoader, module) -> {
builder = builder.method(isAnnotationedMatch())
.intercept(MethodDelegation.withDefaultConfiguration()
.to(new ServiceMetricTracing()));
return builder;
}).with(new AgentBuilder.Listener() {
@Override
public void onDiscovery(String typeName, ClassLoader classLoader, JavaModule module, boolean loaded) {
}
@Override
public void onTransformation(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module,
boolean loaded, DynamicType dynamicType) {
}
@Override
public void onIgnored(TypeDescription typeDescription, ClassLoader classLoader, JavaModule module,
boolean loaded) {
}
@Override public void onError(String typeName, ClassLoader classLoader, JavaModule module, boolean loaded,
Throwable throwable) {
logger.error("Enhance service " + typeName + " error.", throwable);
}
private final Logger logger = LoggerFactory.getLogger(ServiceInstrumentation.class);
private ElementMatcher<? super MethodDescription> excludeObjectMethodsMatcher;
@Override
public void onComplete(String typeName, ClassLoader classLoader, JavaModule module, boolean loaded) {
public Service buildServiceUnderMonitor(String moduleName, String providerName, Service implementation) {
if (implementation instanceof TracedService) {
// Duplicate service instrument, ignore.
return implementation;
}
try {
return new ByteBuddy().subclass(implementation.getClass())
.implement(TracedService.class)
.method(getDefaultMatcher()).intercept(
MethodDelegation.withDefaultConfiguration().to(new ServiceMetricTracing(moduleName, providerName, implementation.getClass().getName()))
).make().load(getClass().getClassLoader()
).getLoaded().newInstance();
} catch (InstantiationException e) {
logger.error("Create instrumented service " + implementation.getClass() + " fail.", e);
} catch (IllegalAccessException e) {
logger.error("Create instrumented service " + implementation.getClass() + " fail.", e);
}
return implementation;
}
}).installOn(instrumentation);
}
private ElementMatcher<? super MethodDescription> getDefaultMatcher() {
if (excludeObjectMethodsMatcher == null) {
excludeObjectMethodsMatcher = not(isStatic().or(named("getClass")).or(named("hashCode")).or(named("equals")).or(named("clone"))
.or(named("toString")).or(named("notify")).or(named("notifyAll")).or(named("wait")).or(named("finalize")));
}
return excludeObjectMethodsMatcher;
private static ElementMatcher<? super MethodDescription> isAnnotationedMatch() {
return isAnnotatedWith(named("org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The <code>MetricCollector</code> collects the service metrics by Module/Provider/Service structure.
*/
public enum MetricCollector implements Runnable {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(MetricCollector.class);
private HashMap<String, ModuleMetric> modules = new HashMap<>();
MetricCollector() {
ScheduledExecutorService service = Executors
.newSingleThreadScheduledExecutor();
service.scheduleAtFixedRate(this, 10, 60, TimeUnit.SECONDS);
}
@Override
public void run() {
if (!logger.isDebugEnabled()) {
return;
}
StringBuilder report = new StringBuilder();
report.append("\n");
report.append("##################################################################################################################\n");
report.append("# Collector Service Report #\n");
report.append("##################################################################################################################\n");
modules.forEach((moduleName, moduleMetric) -> {
report.append(moduleName).append(":\n");
moduleMetric.providers.forEach((providerName, providerMetric) -> {
report.append("\t").append(providerName).append(":\n");
providerMetric.services.forEach((serviceName, serviceMetric) -> {
serviceMetric.methodMetrics.forEach((method, metric) -> {
report.append("\t\t").append(method).append(":\n");
report.append("\t\t\t").append(metric).append("\n");
serviceMetric.methodMetrics.put(method, new ServiceMethodMetric());
});
});
});
});
logger.debug(report.toString());
}
ServiceMetric registerService(String module, String provider, String service) {
return initIfAbsent(module).initIfAbsent(provider).initIfAbsent(service);
}
private ModuleMetric initIfAbsent(String moduleName) {
if (!modules.containsKey(moduleName)) {
ModuleMetric metric = new ModuleMetric(moduleName);
modules.put(moduleName, metric);
return metric;
}
return modules.get(moduleName);
}
private class ModuleMetric {
private String moduleName;
private HashMap<String, ProviderMetric> providers = new HashMap<>();
public ModuleMetric(String moduleName) {
this.moduleName = moduleName;
}
private ProviderMetric initIfAbsent(String providerName) {
if (!providers.containsKey(providerName)) {
ProviderMetric metric = new ProviderMetric(providerName);
providers.put(providerName, metric);
return metric;
}
return providers.get(providerName);
}
}
private class ProviderMetric {
private String providerName;
private HashMap<String, ServiceMetric> services = new HashMap<>();
public ProviderMetric(String providerName) {
this.providerName = providerName;
}
private ServiceMetric initIfAbsent(String serviceName) {
if (!services.containsKey(serviceName)) {
ServiceMetric metric = new ServiceMetric(serviceName);
services.put(serviceName, metric);
return metric;
}
return services.get(serviceName);
}
}
class ServiceMetric {
private String serviceName;
private ConcurrentHashMap<Method, ServiceMethodMetric> methodMetrics = new ConcurrentHashMap<>();
public ServiceMetric(String serviceName) {
this.serviceName = serviceName;
}
void trace(Method method, long nano, boolean occurException) {
if (logger.isDebugEnabled()) {
ServiceMethodMetric metric = methodMetrics.get(method);
if (metric == null) {
ServiceMethodMetric methodMetric = new ServiceMethodMetric();
methodMetrics.putIfAbsent(method, methodMetric);
metric = methodMetrics.get(method);
}
metric.add(nano, occurException);
}
}
}
private class ServiceMethodMetric {
private AtomicLong totalTimeNano;
private AtomicLong counter;
private AtomicLong errorCounter;
public ServiceMethodMetric() {
totalTimeNano = new AtomicLong(0);
counter = new AtomicLong(0);
errorCounter = new AtomicLong(0);
}
private void add(long nano, boolean occurException) {
totalTimeNano.addAndGet(nano);
counter.incrementAndGet();
if (occurException)
errorCounter.incrementAndGet();
}
@Override public String toString() {
if (counter.longValue() == 0) {
return "Avg=N/A";
}
return "Avg=" + (totalTimeNano.longValue() / counter.longValue()) + " (nano)" +
", Success Rate=" + (counter.longValue() - errorCounter.longValue()) * 100 / counter.longValue() +
"%";
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.core.annotations.trace.BatchParameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wusheng
*/
public enum MetricTree implements Runnable {
INSTANCE;
private final Logger logger = LoggerFactory.getLogger(MetricTree.class);
private ScheduledFuture<?> scheduledFuture;
private List<MetricNode> metrics = new LinkedList<>();
private String lineSeparator = System.getProperty("line.separator");
MetricTree() {
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
scheduledFuture = service.scheduleAtFixedRate(this, 60, 60, TimeUnit.SECONDS);
}
synchronized MetricNode lookup(String metricName) {
MetricNode node = new MetricNode(metricName);
metrics.add(node);
return node;
}
@Override
public void run() {
try {
metrics.forEach((metric) -> {
metric.exchange();
});
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
StringBuilder logBuffer = new StringBuilder();
logBuffer.append(lineSeparator);
logBuffer.append("##################################################################################################################").append(lineSeparator);
logBuffer.append("# Collector Service Report #").append(lineSeparator);
logBuffer.append("##################################################################################################################").append(lineSeparator);
metrics.forEach((metric) -> {
metric.toOutput(new ReportWriter() {
@Override public void writeMetricName(String name) {
logBuffer.append(name).append("").append(lineSeparator);
}
@Override public void writeMetric(String metrics) {
logBuffer.append("\t");
logBuffer.append(metrics).append("").append(lineSeparator);
}
});
});
logger.warn(logBuffer.toString());
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
}
class MetricNode {
private String metricName;
private volatile ServiceMetric metric;
public MetricNode(String metricName) {
this.metricName = metricName;
}
ServiceMetric getMetric(Method targetMethod, Object[] allArguments) {
if (metric == null) {
synchronized (metricName) {
if (metric == null) {
int detectedBatchIndex = -1;
String batchNodeNameSuffix = null;
if (targetMethod != null) {
Annotation[][] annotations = targetMethod.getParameterAnnotations();
if (annotations != null) {
int index = 0;
for (Annotation[] parameterAnnotation : annotations) {
if (parameterAnnotation != null) {
for (Annotation annotation : parameterAnnotation) {
if (annotation instanceof BatchParameter) {
detectedBatchIndex = index;
break;
}
}
}
if (detectedBatchIndex > -1) {
break;
}
index++;
}
if (detectedBatchIndex > -1) {
Object listArgs = allArguments[index];
if (listArgs instanceof List) {
List args = (List)listArgs;
batchNodeNameSuffix = "/" + args.get(0).getClass().getSimpleName();
metricName += batchNodeNameSuffix;
}
}
}
}
metric = new ServiceMetric(metricName, detectedBatchIndex);
if (batchNodeNameSuffix != null) {
this.metricName += batchNodeNameSuffix;
}
}
}
}
return metric;
}
void exchange() {
if (metric != null) {
metric.exchangeWindows();
}
}
void toOutput(ReportWriter writer) {
writer.writeMetricName(metricName);
if (metric != null) {
metric.toOutput(writer);
}
}
}
}
......@@ -16,15 +16,14 @@
*
*/
package org.apache.skywalking.apm.collector.instrument;
/**
* The <code>TracedService</code> implementation are dynamic class, generated by {@link ServiceInstrumentation}.
*
* By that, all the services metrics are collected, and report in the certain cycle through console.
*
* @author wu-sheng
* @author wusheng
*/
public interface TracedService {
public interface ReportWriter {
void writeMetricName(String name);
void writeMetric(String metrics);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.util.List;
/**
* @author wusheng
*/
public class ServiceMetric {
private String metricName;
private ServiceMetricRecord winA;
private ServiceMetricRecord winB;
private volatile boolean isUsingWinA;
private volatile int detectedBatchIndex;
ServiceMetric(String metricName, int detectedBatchIndex) {
this.metricName = metricName;
winA = detectedBatchIndex > -1 ? new ServiceMetricBatchRecord() : new ServiceMetricRecord();
winB = detectedBatchIndex > -1 ? new ServiceMetricBatchRecord() : new ServiceMetricRecord();
isUsingWinA = true;
this.detectedBatchIndex = detectedBatchIndex;
}
public void trace(long nano, boolean occurException, Object[] allArguments) {
ServiceMetricRecord usingRecord = isUsingWinA ? winA : winB;
if (detectedBatchIndex > -1) {
List listArgs = (List)allArguments[detectedBatchIndex];
((ServiceMetricBatchRecord)usingRecord).add(nano, occurException, listArgs == null ? 0 : listArgs.size());
} else {
usingRecord.add(nano, occurException);
}
}
void exchangeWindows() {
isUsingWinA = !isUsingWinA;
}
public void toOutput(ReportWriter writer) {
/**
* If using A, then B is available and free to output.
*/
writer.writeMetric(isUsingWinA ? winB.toString() : winA.toString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author wusheng
*/
public class ServiceMetricBatchRecord extends ServiceMetricRecord {
private AtomicLong batchRowSize;
public ServiceMetricBatchRecord() {
super();
batchRowSize = new AtomicLong(0);
}
void add(long nano, boolean occurException, int rowSize) {
super.add(nano, occurException);
batchRowSize.addAndGet(rowSize);
}
@Override void clear() {
super.clear();
batchRowSize.set(0);
}
@Override
public String toString() {
if (counter.longValue() == 0) {
return "Avg=N/A";
}
return super.toString() + " Rows per call = " + (batchRowSize.get() / counter.get());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.util.concurrent.atomic.AtomicLong;
/**
* @author wusheng
*/
public class ServiceMetricRecord {
private AtomicLong totalTimeNano;
protected AtomicLong counter;
private AtomicLong errorCounter;
public ServiceMetricRecord() {
totalTimeNano = new AtomicLong(0);
counter = new AtomicLong(0);
errorCounter = new AtomicLong(0);
}
void add(long nano, boolean occurException) {
totalTimeNano.addAndGet(nano);
counter.incrementAndGet();
if (occurException)
errorCounter.incrementAndGet();
}
void clear() {
totalTimeNano.set(0);
counter.set(0);
errorCounter.set(0);
}
@Override public String toString() {
if (counter.longValue() == 0) {
return "Avg=N/A";
}
return "Avg=" + (totalTimeNano.longValue() / counter.longValue()) + " (nano)" +
", Success Rate=" + (counter.longValue() - errorCounter.longValue()) * 100 / counter.longValue() +
"%, Calls=" + counter.longValue();
}
}
......@@ -16,33 +16,45 @@
*
*/
package org.apache.skywalking.apm.collector.instrument;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import net.bytebuddy.implementation.bind.annotation.AllArguments;
import net.bytebuddy.implementation.bind.annotation.Origin;
import net.bytebuddy.implementation.bind.annotation.RuntimeType;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import net.bytebuddy.implementation.bind.annotation.This;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
/**
* @author wu-sheng
*/
public class ServiceMetricTracing {
private MetricCollector.ServiceMetric serviceMetric;
private volatile ConcurrentHashMap<Method, ServiceMetric> metrics = new ConcurrentHashMap<>();
public ServiceMetricTracing(String module, String provider, String service) {
serviceMetric = MetricCollector.INSTANCE.registerService(module, provider, service);
public ServiceMetricTracing() {
}
@RuntimeType
public Object intercept(@This Object obj,
@AllArguments Object[] allArguments,
public Object intercept(
@This Object inst,
@SuperCall Callable<?> zuper,
@AllArguments Object[] allArguments,
@Origin Method method
) throws Throwable {
ServiceMetric metric = this.metrics.get(method);
if (metric == null) {
GraphComputingMetric annotation = method.getAnnotation(GraphComputingMetric.class);
String metricName = annotation.name();
synchronized (inst) {
MetricTree.MetricNode metricNode = MetricTree.INSTANCE.lookup(metricName);
ServiceMetric serviceMetric = metricNode.getMetric(method, allArguments);
metrics.put(method, serviceMetric);
metric = serviceMetric;
}
}
boolean occurError = false;
long startNano = System.nanoTime();
long endNano;
......@@ -53,7 +65,8 @@ public class ServiceMetricTracing {
throw t;
} finally {
endNano = System.nanoTime();
serviceMetric.trace(method, endNano - startNano, occurError);
metric.trace(endNano - startNano, occurError, allArguments);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册