提交 7a91ef7a 编写于 作者: wu-sheng's avatar wu-sheng

Merge branch 'master' of https://github.com/wu-sheng/sky-walking

* 'master' of https://github.com/wu-sheng/sky-walking:
  fix miss transport entryServiceName and entryServiceId value
  fix miss entryServiceId and entryServiceName in ContextSnapshot  issue
  fix issue that open tracing activation set tag failed
  Make sure the JVM stop hook works #295
  Add jvm shutdown hook

# Conflicts:
#	apm-sniffer/apm-agent-core/src/main/java/org/skywalking/apm/agent/core/context/ContextSnapshot.java
......@@ -68,16 +68,19 @@ public class SkywalkingSpan implements Span {
return SkywalkingContext.INSTANCE;
}
@NeedSnifferActivation(
"1. ContextManager#activeSpan()" +
"2. SkywalkingSpan#setTag(String, String)")
@Override public Span setTag(String key, String value) {
return null;
return this;
}
@Override public Span setTag(String key, boolean value) {
return null;
return setTag(key, String.valueOf(value));
}
@Override public Span setTag(String key, Number value) {
return null;
return setTag(key, String.valueOf(value));
}
@Override
......
......@@ -20,6 +20,7 @@ public class ConsumerPool<T> {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i].setDaemon(true);
}
}
......
......@@ -13,4 +13,6 @@ public interface BootService {
void boot() throws Throwable;
void afterBoot() throws Throwable;
void shutdown() throws Throwable;
}
......@@ -27,6 +27,16 @@ public enum ServiceManager {
afterBoot();
}
public void shutdown() {
for (BootService service : bootedServices.values()) {
try {
service.shutdown();
} catch (Throwable e) {
logger.error(e, "ServiceManager try to shutdown [{}] fail.", service.getClass().getName());
}
}
}
private Map<Class, BootService> loadAllServices() {
HashMap<Class, BootService> bootedServices = new HashMap<Class, BootService>();
Iterator<BootService> serviceIterator = load().iterator();
......
......@@ -162,6 +162,10 @@ public class ContextManager implements TracingContextListener, BootService, Igno
}
@Override public void shutdown() throws Throwable {
}
@Override
public void afterFinished(TraceSegment traceSegment) {
CONTEXT.remove();
......
......@@ -2,6 +2,8 @@ package org.skywalking.apm.agent.core.context;
import java.util.List;
import org.skywalking.apm.agent.core.context.ids.DistributedTraceId;
import org.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.skywalking.apm.util.StringUtil;
/**
* The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building
......@@ -20,16 +22,24 @@ public class ContextSnapshot {
*/
private int spanId = -1;
private String entryOperationName;
/**
* {@link DistributedTraceId}
*/
private DistributedTraceId primaryDistributedTraceId;
ContextSnapshot(String traceSegmentId, int spanId,
List<DistributedTraceId> distributedTraceIds) {
List<DistributedTraceId> distributedTraceIds, int entryServiceId, String entryOperationName) {
this.traceSegmentId = traceSegmentId;
this.spanId = spanId;
this.primaryDistributedTraceId = distributedTraceIds.get(0);
if (entryServiceId == DictionaryUtil.nullValue()) {
this.entryOperationName = "#" + entryOperationName;
} else {
this.entryOperationName = String.valueOf(entryServiceId);
}
}
public DistributedTraceId getDistributedTraceId() {
......@@ -47,6 +57,11 @@ public class ContextSnapshot {
public boolean isValid() {
return traceSegmentId != null
&& spanId > -1
&& primaryDistributedTraceId != null;
&& primaryDistributedTraceId != null
&& !StringUtil.isEmpty(entryOperationName);
}
public String getEntryOperationName() {
return entryOperationName;
}
}
......@@ -33,7 +33,7 @@ public class IgnoredTracerContext implements AbstractTracerContext {
}
@Override public ContextSnapshot capture() {
return new ContextSnapshot(null, -1, null);
return new ContextSnapshot(null, -1, null, 0, null);
}
@Override public void continued(ContextSnapshot snapshot) {
......
......@@ -132,10 +132,20 @@ public class TracingContext implements AbstractTracerContext {
*/
@Override
public ContextSnapshot capture() {
return new ContextSnapshot(segment.getTraceSegmentId(),
activeSpan().getSpanId(),
segment.getRelatedGlobalTraces()
);
List<TraceSegmentRef> refs = this.segment.getRefs();
if (refs != null && refs.size() > 0) {
TraceSegmentRef ref = refs.get(0);
return new ContextSnapshot(segment.getTraceSegmentId(),
activeSpan().getSpanId(),
segment.getRelatedGlobalTraces(), ref.getOperationId(), ref.getOperationName()
);
} else {
AbstractTracingSpan firstSpan = first();
return new ContextSnapshot(segment.getTraceSegmentId(),
activeSpan().getSpanId(),
segment.getRelatedGlobalTraces(), firstSpan.getOperationId(), firstSpan.getOperationName()
);
}
}
/**
......
......@@ -57,6 +57,12 @@ public class TraceSegmentRef {
this.type = SegmentRefType.CROSS_THREAD;
this.traceSegmentId = snapshot.getTraceSegmentId();
this.spanId = snapshot.getSpanId();
String entryOperationName = snapshot.getEntryOperationName();
if (entryOperationName.charAt(0) == '#') {
this.operationName = entryOperationName.substring(1);
} else {
this.operationId = Integer.parseInt(entryOperationName);
}
}
public String getOperationName() {
......@@ -77,17 +83,17 @@ public class TraceSegmentRef {
} else {
refBuilder.setNetworkAddressId(peerId);
}
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
} else {
refBuilder.setEntryServiceId(operationId);
}
} else {
refBuilder.setRefType(RefType.CrossThread);
}
refBuilder.setParentTraceSegmentId(traceSegmentId);
refBuilder.setParentSpanId(spanId);
if (operationId == DictionaryUtil.nullValue()) {
refBuilder.setEntryServiceName(operationName);
} else {
refBuilder.setEntryServiceId(operationId);
}
return refBuilder.build();
}
......
......@@ -62,6 +62,12 @@ public class JVMService implements BootService, Runnable {
}
@Override
public void shutdown() throws Throwable {
collectMetricFuture.cancel(true);
sendMetricFuture.cancel(true);
}
@Override
public void run() {
if (RemoteDownstreamConfig.Agent.APPLICATION_ID != DictionaryUtil.nullValue()
......
......@@ -77,6 +77,11 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
applicationRegisterFuture.cancel(true);
}
@Override
public void run() {
if (CONNECTED.equals(status)) {
......
package org.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.skywalking.apm.agent.core.boot.BootService;
import org.skywalking.apm.agent.core.conf.Config;
......@@ -11,6 +12,8 @@ import org.skywalking.apm.agent.core.conf.Config;
* @author wusheng
*/
public class CollectorDiscoveryService implements BootService {
private ScheduledFuture<?> future;
@Override
public void beforeBoot() throws Throwable {
......@@ -18,7 +21,7 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void boot() throws Throwable {
Executors.newSingleThreadScheduledExecutor()
future = Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new DiscoveryRestServiceClient(), 0,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}
......@@ -27,4 +30,9 @@ public class CollectorDiscoveryService implements BootService {
public void afterBoot() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
future.cancel(true);
}
}
......@@ -49,6 +49,12 @@ public class GRPCChannelManager implements BootService, Runnable {
}
@Override
public void shutdown() throws Throwable {
connectCheckFuture.cancel(true);
managedChannel.shutdownNow();
}
@Override
public void run() {
if (reconnect) {
......
......@@ -49,6 +49,11 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
carrier.shutdownConsumers();
}
@Override
public void init() {
......
......@@ -61,6 +61,11 @@ public class SamplingService implements BootService {
}
@Override
public void shutdown() throws Throwable {
scheduledFuture.cancel(true);
}
/**
* @return true, if sampling mechanism is on, and get the sampling factor successfully.
*/
......
......@@ -44,6 +44,12 @@ public class SkyWalkingAgent {
ServiceManager.INSTANCE.boot();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override public void run() {
ServiceManager.INSTANCE.shutdown();
}
}, "skywalking service shutdown thread"));
new AgentBuilder.Default().type(pluginFinder.buildMatch()).transform(new AgentBuilder.Transformer() {
@Override
public DynamicType.Builder<?> transform(DynamicType.Builder<?> builder, TypeDescription typeDescription,
......
......@@ -48,6 +48,7 @@ public class SkywalkingSpanActivation extends ClassInstanceMethodsEnhancePluginD
private static final String FINISH_METHOD_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanFinishInterceptor";
private static final String LOG_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanLogInterceptor";
private static final String SET_OPERATION_NAME_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanSetOperationNameInterceptor";
private static final String SET_TAG_INTERCEPTOR = "org.skywalking.apm.toolkit.activation.opentracing.span.SpanSetTagInterceptor";
@Override
protected ClassMatch enhanceClass() {
......@@ -132,6 +133,19 @@ public class SkywalkingSpanActivation extends ClassInstanceMethodsEnhancePluginD
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("setTag").and(takesArgument(0, String.class)).and(takesArgument(1, String.class));
}
@Override public String getMethodsInterceptor() {
return SET_TAG_INTERCEPTOR;
}
@Override public boolean isOverrideArgs() {
return false;
}
}
};
}
......
package org.skywalking.apm.toolkit.activation.opentracing.span;
import io.opentracing.tag.Tags;
import org.skywalking.apm.agent.core.context.ContextManager;
import org.skywalking.apm.agent.core.context.trace.AbstractSpan;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import org.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
public class SpanSetTagInterceptor implements InstanceMethodsAroundInterceptor {
@Override
public void beforeMethod(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, Object ret) throws Throwable {
AbstractSpan activeSpan = ContextManager.activeSpan();
String tagKey = String.valueOf(allArguments[0]);
String tagValue = String.valueOf(allArguments[1]);
if (Tags.COMPONENT.getKey().equals(tagKey)) {
activeSpan.setComponent(tagValue);
} else if (Tags.PEER_SERVICE.getKey().equals(tagKey)) {
activeSpan.setOperationName(tagValue);
} else {
activeSpan.tag(tagKey, tagValue);
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, String methodName, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册