未验证 提交 ecc0f944 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Zipkin receiver in SkyWalking collector (#1273)

* Add Zipkin receiver
上级 da784202
Apache SkyWalking | [中文](README_ZH.md)
Apache SkyWalking
==========
<img src="https://skywalkingtest.github.io/page-resources/3.0/skywalking.png" alt="Sky Walking logo" height="90px" align="right" />
......@@ -40,6 +40,9 @@ including:
- Modern and cool Web UI
- Log integration
- Alarm for slow or unstable(low SLA) application, instance and service
- [**Incubating**] Support accepting other tracer data formats.
- Zipkin JSON, Thrift, Protobuf v1 and v2 formats, powered by [OpenZipkin](https://github.com/openzipkin/zipkin) libs
- Jaeger in [Zipkin Thrift or JSON v1/v2 formats](https://github.com/jaegertracing/jaeger#backwards-compatibility-with-zipkin)
# Document
- [Documents in English](docs/README.md)
......@@ -53,7 +56,6 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU
# Live Demo
- Host in Beijing. [goto](http://49.4.12.44:8080/)
- Host in HK. [goto](http://159.138.0.181:8080/)
# Screenshot
<img src="https://skywalkingtest.github.io/page-resources/5.0.0-beta/Dashboard.png"/>
......@@ -61,11 +63,8 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU
- [See all screenshots](/docs/Screenshots.md)
# Test reports
- Automatic integration test reports
- [Java Agent test report](https://github.com/SkywalkingTest/agent-integration-test-report)
- Performance test reports
- [Java Agent test report](https://skywalkingtest.github.io/Agent-Benchmarks/)
# Compiling project
Follow this [document](https://github.com/apache/incubator-skywalking/blob/master/docs/en/How-to-build.md).
# Contact Us
* Submit an issue
......
......@@ -38,6 +38,9 @@ Apache SkyWalking | [English](README.md)
- 现代化Web UI
- 日志集成
- 应用、实例和服务的告警
- [**Incubating**]支持接口其他探针的数据
- 接受Zipkin v1 v2格式数据,采用JSON, Thrift, Protobuf序列化方式。Powered by [OpenZipkin](https://github.com/openzipkin/zipkin) libs
- 接受Jaeger 使用 [Zipkin Thrift 或 JSON v1/v2 格式](https://github.com/jaegertracing/jaeger#backwards-compatibility-with-zipkin)
# Document
- [英文文档](docs/README.md)
......@@ -52,7 +55,6 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU
# Live Demo
- 北京服务器. [前往](http://49.4.12.44:8080/)
- 香港服务器. [前往](http://159.138.0.181:8080/)
# Screenshot
<img src="https://skywalkingtest.github.io/page-resources/5.0.0-beta/Dashboard.png"/>
......@@ -60,16 +62,8 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU
- [查看所有系统截图](/docs/Screenshots.md)
# Test reports
- 自动化集成测试报告
- [Java探针测试报告](https://github.com/SkywalkingTest/agent-integration-test-report)
- 性能测试报告
- [Java探针测试报告](https://skywalkingtest.github.io/Agent-Benchmarks/)
# Users
<img src="https://skywalkingtest.github.io/page-resources/users/users-2018-06-07.png"/>
[报告新的用户案例](https://github.com/apache/incubator-skywalking/issues/443)
# Compiling project
查看[编译指南](https://github.com/apache/incubator-skywalking/blob/master/docs/cn/How-to-build-CN.md)
# Contact Us
* 直接提交Issue
......@@ -77,5 +71,11 @@ This project adheres to the Contributor Covenant [code of conduct](CODE_OF_CONDU
* [Gitter](https://gitter.im/openskywalking/Lobby)
* QQ群: 392443393
# Users
<img src="https://skywalkingtest.github.io/page-resources/users/users-2018-06-07.png"/>
[报告新的用户案例](https://github.com/apache/incubator-skywalking/issues/443)
# License
[Apache 2.0 License.](/LICENSE)
......@@ -18,12 +18,11 @@
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -54,7 +53,14 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
@Override
public void registerInstance(ApplicationInstance request,
StreamObserver<ApplicationInstanceMapping> responseObserver) {
int instanceId = instanceIDService.getOrCreateByAgentUUID(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), buildOsInfo(request.getOsinfo()));
OSInfo osinfo = request.getOsinfo();
AgentOsInfo agentOsInfo = new AgentOsInfo();
agentOsInfo.setHostname(osinfo.getHostname());
agentOsInfo.setOsName(osinfo.getOsName());
agentOsInfo.setProcessNo(osinfo.getProcessNo());
agentOsInfo.setIpv4s(osinfo.getIpv4SList());
int instanceId = instanceIDService.getOrCreateByAgentUUID(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime(), agentOsInfo);
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
......@@ -69,18 +75,4 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
responseObserver.onNext(Downstream.getDefaultInstance());
responseObserver.onCompleted();
}
private String buildOsInfo(OSInfo osinfo) {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", osinfo.getOsName());
osInfoJson.addProperty("hostName", osinfo.getHostname());
osInfoJson.addProperty("processId", osinfo.getProcessNo());
JsonArray ipv4Array = new JsonArray();
for (String ipv4 : osinfo.getIpv4SList()) {
ipv4Array.add(ipv4);
}
osInfoJson.add("ipv4s", ipv4Array);
return osInfoJson.toString();
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming;
import com.google.gson.JsonArray;
......@@ -24,12 +23,12 @@ import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
/**
* @author peng-yongsheng
*/
public class AgentGRPCNamingHandler extends JettyHandler {
public class AgentGRPCNamingHandler extends JettyJsonHandler {
private final AgentGRPCNamingListener namingListener;
......
......@@ -18,11 +18,16 @@
package org.apache.skywalking.apm.collector.agent.grpc.provider.handler;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.core.module.MockModule;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.network.proto.*;
import org.apache.skywalking.apm.network.proto.ApplicationInstance;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.proto.Downstream;
import org.apache.skywalking.apm.network.proto.OSInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -31,11 +36,9 @@ import org.mockito.Mock;
import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.UUID;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
......@@ -65,22 +68,22 @@ public class InstanceDiscoveryServiceHandlerTest {
@Test
public void registerInstance() {
ApplicationInstance applicationInstance = ApplicationInstance.newBuilder()
.setAgentUUID(UUID.randomUUID().toString())
.setApplicationId(10)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(
OSInfo.newBuilder()
.setOsName("MAC OS")
.setHostname("test")
.addIpv4S("127.0.0.1")
.setProcessNo(123456)
.build()
).build();
when(instanceIDService.getOrCreateByAgentUUID(anyInt(), anyString(), anyLong(), anyString())).thenReturn(100);
.setAgentUUID(UUID.randomUUID().toString())
.setApplicationId(10)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(
OSInfo.newBuilder()
.setOsName("MAC OS")
.setHostname("test")
.addIpv4S("127.0.0.1")
.setProcessNo(123456)
.build()
).build();
when(instanceIDService.getOrCreateByAgentUUID(anyInt(), anyString(), anyLong(), anyObject())).thenReturn(100);
instanceDiscoveryServiceHandler.registerInstance(applicationInstance, new StreamObserver<ApplicationInstanceMapping>() {
@Override
public void onNext(ApplicationInstanceMapping applicationInstanceMapping) {
Assert.assertEquals(100,applicationInstanceMapping.getApplicationInstanceId());
Assert.assertEquals(100, applicationInstanceMapping.getApplicationInstanceId());
}
@Override
......@@ -98,13 +101,13 @@ public class InstanceDiscoveryServiceHandlerTest {
@Test
public void heartbeat() {
ApplicationInstanceHeartbeat heartbeat = ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(100)
.setHeartbeatTime(System.currentTimeMillis())
.build();
.setApplicationInstanceId(100)
.setHeartbeatTime(System.currentTimeMillis())
.build();
instanceDiscoveryServiceHandler.heartbeat(heartbeat, new StreamObserver<Downstream>() {
@Override
public void onNext(Downstream downstream) {
Assert.assertEquals(Downstream.getDefaultInstance(),downstream);
Assert.assertEquals(Downstream.getDefaultInstance(), downstream);
}
@Override
......@@ -118,4 +121,4 @@ public class InstanceDiscoveryServiceHandlerTest {
}
});
}
}
\ No newline at end of file
}
......@@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ApplicationRegisterServletHandler extends JettyHandler {
public class ApplicationRegisterServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
......
......@@ -24,17 +24,18 @@ import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceDiscoveryServletHandler extends JettyHandler {
public class InstanceDiscoveryServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
......@@ -66,9 +67,10 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
int applicationId = instance.get(APPLICATION_ID).getAsInt();
String agentUUID = instance.get(AGENT_UUID).getAsString();
long registerTime = instance.get(REGISTER_TIME).getAsLong();
JsonObject osInfo = instance.get(OS_INFO).getAsJsonObject();
JsonObject osInfoJson = instance.get(OS_INFO).getAsJsonObject();
AgentOsInfo osInfo = gson.fromJson(osInfoJson, AgentOsInfo.class);
int instanceId = instanceIDService.getOrCreateByAgentUUID(applicationId, agentUUID, registerTime, osInfo.toString());
int instanceId = instanceIDService.getOrCreateByAgentUUID(applicationId, agentUUID, registerTime, osInfo);
responseJson.addProperty(APPLICATION_ID, applicationId);
responseJson.addProperty(INSTANCE_ID, instanceId);
} catch (IOException e) {
......
......@@ -27,14 +27,14 @@ import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetric
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class InstanceHeartBeatServletHandler extends JettyHandler {
public class InstanceHeartBeatServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);
......
......@@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class NetworkAddressRegisterServletHandler extends JettyHandler {
public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServletHandler.class);
......
......@@ -28,14 +28,14 @@ import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegi
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
public class ServiceNameDiscoveryServiceHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
......
......@@ -29,14 +29,14 @@ import org.apache.skywalking.apm.collector.analysis.segment.parser.define.Analys
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class TraceSegmentServletHandler extends JettyHandler {
public class TraceSegmentServletHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
......
......@@ -24,12 +24,12 @@ import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
/**
* @author peng-yongsheng
*/
public class AgentJettyNamingHandler extends JettyHandler {
public class AgentJettyNamingHandler extends JettyJsonHandler {
private final AgentJettyNamingListener namingListener;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.register.define.service;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import java.util.List;
/**
* @author wusheng
*/
public class AgentOsInfo {
private String osName;
private String hostname;
private int processNo;
private List<String> ipv4s;
public AgentOsInfo() {
}
public String getOsName() {
return osName;
}
public void setOsName(String osName) {
this.osName = osName;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public int getProcessNo() {
return processNo;
}
public void setProcessNo(int processNo) {
this.processNo = processNo;
}
public List<String> getIpv4s() {
return ipv4s;
}
public void setIpv4s(List<String> ipv4s) {
this.ipv4s = ipv4s;
}
@Override public String toString() {
JsonObject osInfoJson = new JsonObject();
osInfoJson.addProperty("osName", this.getOsName());
osInfoJson.addProperty("hostName", this.getHostname());
osInfoJson.addProperty("processId", this.getProcessNo());
JsonArray ipv4Array = new JsonArray();
if (this.getIpv4s() != null) {
for (String ipv4 : this.getIpv4s()) {
ipv4Array.add(ipv4);
}
}
osInfoJson.add("ipv4s", ipv4Array);
return osInfoJson.toString();
}
}
......@@ -24,7 +24,7 @@ import org.apache.skywalking.apm.collector.core.module.Service;
* @author peng-yongsheng
*/
public interface IInstanceIDService extends Service {
int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo);
int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo);
int getOrCreateByAddressId(int applicationId, int addressId, long registerTime);
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.service;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ApplicationCacheService;
......@@ -71,7 +72,7 @@ public class InstanceIDService implements IInstanceIDService {
return applicationCacheService;
}
@Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, String osInfo) {
@Override public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) {
logger.debug("get or getOrCreate instance id by agent UUID, application id: {}, agentUUID: {}, registerTime: {}, osInfo: {}", applicationId, agentUUID, registerTime, osInfo);
int instanceId = getInstanceCacheService().getInstanceIdByAgentUUID(applicationId, agentUUID);
......@@ -84,7 +85,7 @@ public class InstanceIDService implements IInstanceIDService {
instance.setRegisterTime(registerTime);
instance.setHeartBeatTime(registerTime);
instance.setInstanceId(0);
instance.setOsInfo(osInfo);
instance.setOsInfo(osInfo.toString());
instance.setIsAddress(BooleanUtils.FALSE);
instance.setAddressId(Const.NONE);
......
......@@ -19,20 +19,32 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.ReferenceDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentCoreInfo;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SegmentDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.SpanDecorator;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.graph.GraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.ReferenceIdExchanger;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardization;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SpanIdExchanger;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.graph.Graph;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.network.proto.*;
import org.slf4j.*;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.UniqueId;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
/**
* @author peng-yongsheng
......@@ -193,7 +205,7 @@ public class SegmentParse {
private void notifyExitListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Exit)) {
((ExitSpanListener)listener).parseExit(spanDecorator, segmentCoreInfo);
((ExitSpanListener) listener).parseExit(spanDecorator, segmentCoreInfo);
}
});
}
......@@ -202,7 +214,7 @@ public class SegmentParse {
private void notifyEntryListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Entry)) {
((EntrySpanListener)listener).parseEntry(spanDecorator, segmentCoreInfo);
((EntrySpanListener) listener).parseEntry(spanDecorator, segmentCoreInfo);
}
});
}
......@@ -211,7 +223,7 @@ public class SegmentParse {
private void notifyLocalListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.Local)) {
((LocalSpanListener)listener).parseLocal(spanDecorator, segmentCoreInfo);
((LocalSpanListener) listener).parseLocal(spanDecorator, segmentCoreInfo);
}
});
}
......@@ -220,7 +232,7 @@ public class SegmentParse {
private void notifyFirstListener(SpanDecorator spanDecorator) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.First)) {
((FirstSpanListener)listener).parseFirst(spanDecorator, segmentCoreInfo);
((FirstSpanListener) listener).parseFirst(spanDecorator, segmentCoreInfo);
}
});
}
......@@ -229,7 +241,7 @@ public class SegmentParse {
private void notifyGlobalsListener(UniqueId uniqueId) {
spanListeners.forEach(listener -> {
if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
((GlobalTraceIdsListener) listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
}
});
}
......@@ -238,4 +250,5 @@ public class SegmentParse {
private void createSpanListeners() {
listenerManager.getSpanListenerFactories().forEach(spanListenerFactory -> spanListeners.add(spanListenerFactory.create(moduleManager)));
}
}
......@@ -185,6 +185,12 @@
<version>${project.version}</version>
</dependency>
<!-- alarm provider -->
<!-- zipkin receiver-->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>receiver-zipkin-provider</artifactId>
<version>${project.version}</version>
</dependency>
<!-- instrument provided dependency -->
<dependency>
......@@ -246,4 +252,4 @@
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
</project>
......@@ -101,4 +101,10 @@ configuration:
thermodynamicResponseTimeStep: 50
thermodynamicCountOfResponseTimeSteps: 40
# max collection's size of worker cache collection, setting it smaller when collector OutOfMemory crashed.
workerCacheMaxSize: 10000
\ No newline at end of file
workerCacheMaxSize: 10000
#receiver_zipkin:
# default:
# host: localhost
# port: 9411
# contextPath: /
#
\ No newline at end of file
......@@ -18,167 +18,12 @@
package org.apache.skywalking.apm.collector.server.jetty;
import com.google.gson.JsonElement;
import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.*;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import static java.util.Objects.nonNull;
import org.apache.skywalking.apm.collector.server.ServerHandler;
/**
* @author peng-yongsheng
*/
public abstract class JettyHandler extends HttpServlet implements ServerHandler {
private static final Logger logger = LoggerFactory.getLogger(JettyHandler.class);
public abstract String pathSpec();
@Override
protected final void doGet(HttpServletRequest req, HttpServletResponse resp) {
try {
reply(resp, doGet(req));
} catch (ArgumentsParseException | IOException e) {
try {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException replyException) {
logger.error(replyException.getMessage(), e);
}
}
}
protected abstract JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException;
@Override
protected final void doPost(HttpServletRequest req, HttpServletResponse resp) {
try {
reply(resp, doPost(req));
} catch (ArgumentsParseException | IOException e) {
try {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException replyException) {
logger.error(replyException.getMessage(), e);
}
}
}
protected abstract JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException;
@Override
protected final void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doHead(req, resp);
}
@Override protected final long getLastModified(HttpServletRequest req) {
return super.getLastModified(req);
}
@Override
protected final void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPut(req, resp);
}
@Override
protected final void doDelete(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doDelete(req, resp);
}
@Override
protected final void doOptions(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doOptions(req, resp);
}
@Override
protected final void doTrace(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doTrace(req, resp);
}
@Override
protected final void service(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.service(req, resp);
}
@Override public final void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
super.service(req, res);
}
@Override public final void destroy() {
super.destroy();
}
@Override public final String getInitParameter(String name) {
return super.getInitParameter(name);
}
@Override public final Enumeration<String> getInitParameterNames() {
return super.getInitParameterNames();
}
@Override public final ServletConfig getServletConfig() {
return super.getServletConfig();
}
@Override public final ServletContext getServletContext() {
return super.getServletContext();
}
@Override public final String getServletInfo() {
return super.getServletInfo();
}
@Override public final void init(ServletConfig config) throws ServletException {
super.init(config);
}
@Override public final void init() throws ServletException {
super.init();
}
@Override public final void log(String msg) {
super.log(msg);
}
@Override public final void log(String message, Throwable t) {
super.log(message, t);
}
@Override public final String getServletName() {
return super.getServletName();
}
private void reply(HttpServletResponse response, JsonElement resJson) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
if (nonNull(resJson)) {
out.print(resJson);
}
out.flush();
out.close();
}
private void replyError(HttpServletResponse response, String errorMessage, int status) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
response.setHeader("error-message", errorMessage);
PrintWriter out = response.getWriter();
out.flush();
out.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.server.jetty;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.nonNull;
/**
* @author wusheng
*/
public abstract class JettyJsonHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(JettyHandler.class);
@Override
protected final void doGet(HttpServletRequest req, HttpServletResponse resp) {
try {
reply(resp, doGet(req));
} catch (ArgumentsParseException | IOException e) {
try {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException replyException) {
logger.error(replyException.getMessage(), e);
}
}
}
protected abstract JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException;
@Override
protected final void doPost(HttpServletRequest req, HttpServletResponse resp) {
try {
reply(resp, doPost(req));
} catch (ArgumentsParseException | IOException e) {
try {
replyError(resp, e.getMessage(), HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException replyException) {
logger.error(replyException.getMessage(), e);
}
}
}
protected abstract JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException;
@Override
protected final void doHead(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doHead(req, resp);
}
@Override protected final long getLastModified(HttpServletRequest req) {
return super.getLastModified(req);
}
@Override
protected final void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
super.doPut(req, resp);
}
@Override
protected final void doDelete(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doDelete(req, resp);
}
@Override
protected final void doOptions(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doOptions(req, resp);
}
@Override
protected final void doTrace(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.doTrace(req, resp);
}
@Override
protected final void service(HttpServletRequest req,
HttpServletResponse resp) throws ServletException, IOException {
super.service(req, resp);
}
@Override public final void service(ServletRequest req, ServletResponse res) throws ServletException, IOException {
super.service(req, res);
}
@Override public final void destroy() {
super.destroy();
}
@Override public final String getInitParameter(String name) {
return super.getInitParameter(name);
}
@Override public final Enumeration<String> getInitParameterNames() {
return super.getInitParameterNames();
}
@Override public final ServletConfig getServletConfig() {
return super.getServletConfig();
}
@Override public final ServletContext getServletContext() {
return super.getServletContext();
}
@Override public final String getServletInfo() {
return super.getServletInfo();
}
@Override public final void init(ServletConfig config) throws ServletException {
super.init(config);
}
@Override public final void init() throws ServletException {
super.init();
}
@Override public final void log(String msg) {
super.log(msg);
}
@Override public final void log(String message, Throwable t) {
super.log(message, t);
}
@Override public final String getServletName() {
return super.getServletName();
}
private void reply(HttpServletResponse response, JsonElement resJson) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
response.setStatus(HttpServletResponse.SC_OK);
PrintWriter out = response.getWriter();
if (nonNull(resJson)) {
out.print(resJson);
}
out.flush();
out.close();
}
private void replyError(HttpServletResponse response, String errorMessage, int status) throws IOException {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
response.setStatus(status);
response.setHeader("error-message", errorMessage);
PrintWriter out = response.getWriter();
out.flush();
out.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-beta2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-thirdparty-receiver</artifactId>
<packaging>pom</packaging>
<modules>
<module>receiver-zipkin</module>
</modules>
</project>
# Zipkin receiver
[Zipkin](http://zipkin.io/) receiver provides the feature to receive span data in Zipkin formats. SkyWalking backend provides
analysis, aggregation and visualization. So the user will not need to learn how SkyWalking auto instrumentation
agents(Java, .NET, node.js) work, or they don't want to change for some reasons, such as Zipkin integration has been completed.
Zipkin receiver is only an optional features in SkyWalking, even now it is [an incubating feature](../../../../docs/en/Incubating/Abstract.md).
## Limits
As an incubating feature, it is a prototype. So it has following limits:
1. Don't try to use SkyWalking native agents and Zipkin's libs in the same distributed system. Considering HEADERs of Zipkin and SkyWalking aren't shared/interoperable, their two will not propagate context for each other. Trace will not continue.
1. Don't support cluster mode.
1. Analysis based on trace will be finished in the certain and given duration. The default assumption is 2 min most. SkyWalking used more complex header and context to avoid this in analysis stage.
## Open Zipkin receiver
Zipkin receiver is an optional module, and default closed. For open it, add these settings in your `application.yml` in collector
```yaml
receiver_zipkin:
default:
host: localhost
port: 9411
contextPath: /
expireTime: 20 # Unit is seconds
maxCacheSize: 1000000 # The number of traces in buffer
```
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-thirdparty-receiver</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-beta2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>receiver-zipkin</artifactId>
<packaging>pom</packaging>
<modules>
<module>receiver-zipkin-define</module>
<module>receiver-zipkin-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>receiver-zipkin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-beta2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>receiver-zipkin-define</artifactId>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.define;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
/**
* Zipkin receiver module provides the HTTP, protoc serve for any SDK or agent by following Zipkin format.
*
* At this moment, Zipkin format is not compatible with SkyWalking, especially HEADERs.
* Please don't consider this as a Zipkin-SkyWalking integration,
* it is provided for adding analysis, aggregation and visualization capabilities to zipkin backend.
*
* @author wusheng
*/
public class ZipkinReceiverModule extends ModuleDefine {
public static final String NAME = "receiver_zipkin";
@Override public String name() {
return NAME;
}
@Override public Class[] services() {
return new Class[0];
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.receiver.zipkin.define.ZipkinReceiverModule
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>receiver-zipkin</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>5.0.0-beta2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>receiver-zipkin-provider</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>receiver-zipkin-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-jetty-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>segment-parser-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>register-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>metric-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.SegmentListener;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer;
/**
* Send the segments to Analysis module, like receiving segments from native SkyWalking agents.
*/
public class Receiver2AnalysisBridge implements SegmentListener {
private ISegmentParseService segmentParseService;
public Receiver2AnalysisBridge(ISegmentParseService segmentParseService) {
this.segmentParseService = segmentParseService;
}
/**
* Add this bridge as listener to Zipkin span transfer.
*/
public void build() {
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
}
@Override
public void notify(SkyWalkingTrace trace) {
trace.toUpstreamSegment().forEach(upstream -> segmentParseService.parse(upstream.build(), ISegmentParseService.Source.Agent));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider;
import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
/**
* @author wusheng
*/
public class RegisterServices {
private IApplicationIDService applicationIDService;
private IInstanceIDService instanceIDService;
private INetworkAddressIDService networkAddressIDService;
private IServiceNameService serviceNameService;
public RegisterServices(
IApplicationIDService applicationIDService,
IInstanceIDService instanceIDService,
INetworkAddressIDService networkAddressIDService,
IServiceNameService serviceNameService) {
this.applicationIDService = applicationIDService;
this.instanceIDService = instanceIDService;
this.networkAddressIDService = networkAddressIDService;
this.serviceNameService = serviceNameService;
}
public IApplicationIDService getApplicationIDService() {
return applicationIDService;
}
public IInstanceIDService getInstanceIDService() {
return instanceIDService;
}
public INetworkAddressIDService getNetworkAddressIDService() {
return networkAddressIDService;
}
public IServiceNameService getServiceNameService() {
return serviceNameService;
}
/**
* @param applicationId
* @param agentUUID in zipkin translation, always means application code. Because no UUID for each process.
* @return
*/
public int getOrCreateApplicationInstanceId(int applicationId, String agentUUID) {
AgentOsInfo agentOsInfo = new AgentOsInfo();
agentOsInfo.setHostname("N/A");
agentOsInfo.setOsName("N/A");
agentOsInfo.setProcessNo(-1);
return getInstanceIDService().getOrCreateByAgentUUID(applicationId, agentUUID, System.currentTimeMillis(), agentOsInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider;
import org.apache.skywalking.apm.collector.server.jetty.JettyServerConfig;
/**
* @author wusheng
*/
public class ZipkinReceiverConfig extends JettyServerConfig {
private int expireTime = 20;
private int maxCacheSize = 1_000_000;
public int getExpireTime() {
return expireTime;
}
public void setExpireTime(int expireTime) {
this.expireTime = expireTime;
}
public int getMaxCacheSize() {
return maxCacheSize;
}
public void setMaxCacheSize(int maxCacheSize) {
this.maxCacheSize = maxCacheSize;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.receiver.zipkin.define.ZipkinReceiverModule;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler.SpanV2JettyHandler;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author wusheng
*/
public class ZipkinReceiverProvider extends ModuleProvider {
public static final String NAME = "default";
private ZipkinReceiverConfig config;
public ZipkinReceiverProvider() {
config = new ZipkinReceiverConfig();
}
@Override public String name() {
return NAME;
}
@Override public Class<? extends ModuleDefine> module() {
return ZipkinReceiverModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
ModuleDefine moduleDefine = getManager().find(AnalysisRegisterModule.NAME);
RegisterServices registerServices = new RegisterServices(moduleDefine.getService(IApplicationIDService.class),
moduleDefine.getService(IInstanceIDService.class),
moduleDefine.getService(INetworkAddressIDService.class),
moduleDefine.getService(IServiceNameService.class));
IInstanceHeartBeatService instanceHeartBeatService = getManager().find(AnalysisMetricModule.NAME).getService(IInstanceHeartBeatService.class);
Zipkin2SkyWalkingTransfer.INSTANCE.setRegisterServices(registerServices);
Zipkin2SkyWalkingTransfer.INSTANCE.setInstanceHeartBeatService(instanceHeartBeatService);
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
JettyServer jettyServer = managerService.createIfAbsent(config.getHost(), config.getPort(), config.getContextPath());
jettyServer.addHandler(new SpanV2JettyHandler(config, registerServices));
ISegmentParseService segmentParseService = getManager().find(AnalysisSegmentParserModule.NAME).getService(ISegmentParseService.class);
Receiver2AnalysisBridge bridge = new Receiver2AnalysisBridge(segmentParseService);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(bridge);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {JettyManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.caffeine.CaffeineSpanCache;
/**
* @author wusheng
*/
public class CacheFactory {
public static final CacheFactory INSTANCE = new CacheFactory();
private ISpanCache implementor;
private CacheFactory() {
}
public ISpanCache get(ZipkinReceiverConfig config) {
if (implementor == null) {
synchronized (INSTANCE) {
if (implementor == null) {
implementor = new CaffeineSpanCache(config);
}
}
}
return implementor;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache;
import zipkin2.Span;
/**
* @author wusheng
*/
public interface ISpanCache {
void addSpan(Span span);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.caffeine;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.ISpanCache;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform.Zipkin2SkyWalkingTransfer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Span;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* NOTICE: FROM my test, Caffeine cache triggers/checks expire only face write/read op.
* In order to make trace finish in time, I have to set a timer to write a meaningless trace, for active expire.
*
* @author wusheng
*/
public class CaffeineSpanCache implements ISpanCache, RemovalListener<String, ZipkinTrace> {
private static final Logger logger = LoggerFactory.getLogger(CaffeineSpanCache.class);
private Cache<String, ZipkinTrace> inProcessSpanCache;
private ReentrantLock newTraceLock;
public CaffeineSpanCache(ZipkinReceiverConfig config) {
newTraceLock = new ReentrantLock();
inProcessSpanCache = Caffeine.newBuilder()
.expireAfterWrite(config.getExpireTime(), TimeUnit.SECONDS)
.maximumSize(config.getMaxCacheSize())
.removalListener(this)
.build();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
inProcessSpanCache.put("ACTIVE", new ZipkinTrace.TriggerTrace());
}, 2, 3, TimeUnit.SECONDS);
}
/**
* Zipkin trace finished by the expired rule.
*
* @param key
* @param trace
* @param cause
*/
@Override
public void onRemoval(@Nullable String key, @Nullable ZipkinTrace trace, @Nonnull RemovalCause cause) {
if (trace instanceof ZipkinTrace.TriggerTrace) {
return;
}
try {
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.warn("Zipkin trace:" + trace);
}
}
@Override
public void addSpan(Span span) {
ZipkinTrace trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
newTraceLock.lock();
try {
trace = inProcessSpanCache.getIfPresent(span.traceId());
if (trace == null) {
trace = new ZipkinTrace();
inProcessSpanCache.put(span.traceId(), trace);
}
} finally {
newTraceLock.unlock();
}
}
trace.addSpan(span);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.data;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.UniqueId;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
import java.util.LinkedList;
import java.util.List;
/**
* Each SkyWalkingTrace consists of segments in each application, original from {@link ZipkinTrace}s
*/
public class SkyWalkingTrace {
private UniqueId globalTraceId;
private List<TraceSegmentObject.Builder> segmentList;
public SkyWalkingTrace(UniqueId globalTraceId, List<TraceSegmentObject.Builder> segmentList) {
this.globalTraceId = globalTraceId;
this.segmentList = segmentList;
}
public List<UpstreamSegment.Builder> toUpstreamSegment() {
List<UpstreamSegment.Builder> newUpstreamList = new LinkedList<>();
segmentList.forEach(segment -> {
UpstreamSegment.Builder builder = UpstreamSegment.newBuilder();
builder.addGlobalTraceIds(globalTraceId);
builder.setSegment(segment.build().toByteString());
newUpstreamList.add(builder);
});
return newUpstreamList;
}
public UniqueId getGlobalTraceId() {
return globalTraceId;
}
public List<TraceSegmentObject.Builder> getSegmentList() {
return segmentList;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.data;
import zipkin2.Span;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author wusheng
*/
public class ZipkinTrace {
private List<Span> spans;
private ReentrantLock spanWriteLock;
public ZipkinTrace() {
spans = new LinkedList<>();
spanWriteLock = new ReentrantLock();
}
public void addSpan(Span span) {
spanWriteLock.lock();
try {
spans.add(span);
} finally {
spanWriteLock.unlock();
}
}
public List<Span> getSpans() {
return spans;
}
@Override
public String toString() {
return "ZipkinTrace{" +
"spans=" + spans +
'}';
}
public static class TriggerTrace extends ZipkinTrace {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.cache.CacheFactory;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.List;
public class SpanProcessor {
void convert(ZipkinReceiverConfig config, SpanBytesDecoder decoder, HttpServletRequest request, RegisterServices registerServices) throws IOException {
int len = request.getContentLength();
ServletInputStream iii = request.getInputStream();
byte[] buffer = new byte[len];
iii.read(buffer, 0, len);
List<Span> spanList = decoder.decodeList(buffer);
spanList.forEach(span -> {
// In Zipkin, the local service name represents the application owner.
String applicationCode = span.localServiceName();
if (applicationCode != null) {
int applicationId = registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode);
if (applicationId != 0) {
registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode);
}
}
CacheFactory.INSTANCE.get(config).addSpan(span);
});
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
public class SpanV1JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
private RegisterServices registerServices;
public SpanV1JettyHandler(ZipkinReceiverConfig config,
RegisterServices registerServices) {
this.config = config;
this.registerServices = registerServices;
}
@Override
public String pathSpec() {
return "/api/v1/spans";
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-thrift")
? SpanBytesDecoder.THRIFT
: SpanBytesDecoder.JSON_V1;
SpanProcessor processor = new SpanProcessor();
processor.convert(config, decoder, request, registerServices);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.handler;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverConfig;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.codec.SpanBytesDecoder;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* @author wusheng
*/
public class SpanV2JettyHandler extends JettyHandler {
private static final Logger logger = LoggerFactory.getLogger(SpanV2JettyHandler.class);
private ZipkinReceiverConfig config;
private RegisterServices registerServices;
public SpanV2JettyHandler(ZipkinReceiverConfig config,
RegisterServices registerServices) {
this.config = config;
this.registerServices = registerServices;
}
@Override
public String pathSpec() {
return "/api/v2/spans";
}
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) {
response.setContentType("application/json");
response.setCharacterEncoding("utf-8");
try {
String type = request.getHeader("Content-Type");
SpanBytesDecoder decoder = type != null && type.contains("/x-protobuf")
? SpanBytesDecoder.PROTO3
: SpanBytesDecoder.JSON_V2;
SpanProcessor processor = new SpanProcessor();
processor.convert(config, decoder, request, registerServices);
response.setStatus(202);
} catch (Exception e) {
response.setStatus(500);
logger.error(e.getMessage(), e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
import org.apache.skywalking.apm.network.proto.*;
import org.eclipse.jetty.util.StringUtil;
import zipkin2.Endpoint;
import zipkin2.Span;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author wusheng
*/
public class SegmentBuilder {
private Context context;
private LinkedList<Segment> segments;
private Map<String, ClientSideSpan> clientPartSpan;
private SegmentBuilder() {
segments = new LinkedList<>();
context = new Context();
clientPartSpan = new HashMap<>();
}
public static SkyWalkingTrace build(List<Span> traceSpans,
RegisterServices registerServices,
IInstanceHeartBeatService instanceHeartBeatService) throws Exception {
SegmentBuilder builder = new SegmentBuilder();
// This map groups the spans by their parent id, in order to assist to build tree.
// key: parentId
// value: span
Map<String, List<Span>> childSpanMap = new HashMap<>();
AtomicReference<Span> root = new AtomicReference<>();
traceSpans.forEach(span -> {
if (span.parentId() == null) {
root.set(span);
}
List<Span> spanList = childSpanMap.get(span.parentId());
if (spanList == null) {
spanList = new LinkedList<>();
spanList.add(span);
childSpanMap.put(span.parentId(), spanList);
} else {
spanList.add(span);
}
});
Span rootSpan = root.get();
if (rootSpan != null) {
String applicationCode = rootSpan.localServiceName();
// If root span doesn't include applicationCode, a.k.a local service name,
// Segment can't be built
// Ignore the whole trace.
// :P Hope anyone could provide better solution.
// Wu Sheng.
if (StringUtils.isNotEmpty(applicationCode)) {
builder.context.addApp(applicationCode, registerServices);
SpanObject.Builder rootSpanBuilder = builder.initSpan(null, null, rootSpan, true);
builder.context.currentSegment().addSpan(rootSpanBuilder);
builder.scanSpansFromRoot(rootSpanBuilder, rootSpan, childSpanMap, registerServices);
builder.segments.add(builder.context.removeApp());
}
}
List<TraceSegmentObject.Builder> segmentBuilders = new LinkedList<>();
builder.segments.forEach(segment -> {
TraceSegmentObject.Builder traceSegmentBuilder = segment.freeze();
segmentBuilders.add(traceSegmentBuilder);
instanceHeartBeatService.heartBeat(traceSegmentBuilder.getApplicationInstanceId(), segment.getEndTime());
});
return new SkyWalkingTrace(builder.generateTraceOrSegmentId(), segmentBuilders);
}
private void scanSpansFromRoot(SpanObject.Builder parentSegmentSpan, Span parent,
Map<String, List<Span>> childSpanMap,
RegisterServices registerServices) throws Exception {
String parentId = parent.id();
// get child spans by parent span id
List<Span> spanList = childSpanMap.get(parentId);
if (spanList == null) {
return;
}
for (Span childSpan : spanList) {
String localServiceName = childSpan.localServiceName();
boolean isNewApp = false;
if (StringUtil.isNotBlank(localServiceName)) {
if (context.isAppChanged(localServiceName)) {
isNewApp = true;
}
}
try {
if (isNewApp) {
context.addApp(localServiceName, registerServices);
}
SpanObject.Builder childSpanBuilder = initSpan(parentSegmentSpan, parent, childSpan, isNewApp);
context.currentSegment().addSpan(childSpanBuilder);
scanSpansFromRoot(childSpanBuilder, childSpan, childSpanMap, registerServices);
} finally {
if (isNewApp) {
segments.add(context.removeApp());
}
}
}
}
private SpanObject.Builder initSpan(SpanObject.Builder parentSegmentSpan, Span parentSpan, Span span,
boolean isSegmentRoot) {
SpanObject.Builder spanBuilder = SpanObject.newBuilder();
spanBuilder.setSpanId(context.currentIDs().nextSpanId());
if (isSegmentRoot) {
// spanId = -1, means no parent span
// spanId is considered unique, and from a positive sequence in each segment.
spanBuilder.setParentSpanId(-1);
}
if (!isSegmentRoot && parentSegmentSpan != null) {
spanBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
}
Span.Kind kind = span.kind();
spanBuilder.setOperationName(span.name());
ClientSideSpan clientSideSpan;
switch (kind) {
case CLIENT:
spanBuilder.setSpanType(SpanType.Exit);
String peer = endpoint2Peer(span.remoteEndpoint());
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
case SERVER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case CONSUMER:
spanBuilder.setSpanType(SpanType.Entry);
this.buildRef(spanBuilder, span, parentSegmentSpan, parentSpan);
break;
case PRODUCER:
spanBuilder.setSpanType(SpanType.Exit);
peer = endpoint2Peer(span.remoteEndpoint());
if (peer != null) {
spanBuilder.setPeer(peer);
}
clientSideSpan = new ClientSideSpan(span, spanBuilder);
clientPartSpan.put(span.id(), clientSideSpan);
break;
default:
spanBuilder.setSpanType(SpanType.Local);
}
// microseconds in Zipkin -> milliseconds in SkyWalking
long startTime = span.timestamp() / 1000;
long duration = span.duration() / 1000;
spanBuilder.setStartTime(startTime);
spanBuilder.setEndTime(startTime + duration);
span.tags().forEach((tagKey, tagValue) -> spanBuilder.addTags(
KeyWithStringValue.newBuilder().setKey(tagKey).setValue(tagValue).build())
);
span.annotations().forEach(annotation ->
spanBuilder.addLogs(LogMessage.newBuilder().setTime(annotation.timestamp() / 1000).addData(
KeyWithStringValue.newBuilder().setKey("zipkin.annotation").setValue(annotation.value()).build()
))
);
return spanBuilder;
}
private String endpoint2Peer(Endpoint endpoint) {
if (endpoint == null) {
return null;
}
String ip = null;
if (StringUtils.isNotEmpty(endpoint.ipv4())) {
ip = endpoint.ipv4();
} else if (StringUtils.isNotEmpty(endpoint.ipv6())) {
ip = endpoint.ipv6();
}
if (StringUtils.isEmpty(ip)) {
return null;
}
int port = endpoint.port();
return port == 0 ? ip : ip + ":" + port;
}
private void buildRef(SpanObject.Builder spanBuilder, Span span, SpanObject.Builder parentSegmentSpan,
Span parentSpan) {
Segment parentSegment = context.parentSegment();
if (parentSegment == null) {
return;
}
Segment rootSegment = context.rootSegment();
if (rootSegment == null) {
return;
}
if (span.shared() != null && span.shared()) {
// using same span id in client and server for RPC
// SkyWalking will build both sides of span
ClientSideSpan clientSideSpan = clientPartSpan.get(span.id());
parentSegmentSpan = clientSideSpan.getBuilder();
parentSpan = clientSideSpan.getSpan();
}
String ip = null;
int port = 0;
Endpoint serverEndpoint = span.localEndpoint();
Endpoint clientEndpoint = parentSpan.remoteEndpoint();
if (clientEndpoint != null) {
if (StringUtil.isBlank(ip)) {
if (StringUtils.isNotEmpty(clientEndpoint.ipv4())) {
ip = clientEndpoint.ipv4();
} else if (StringUtils.isNotEmpty(clientEndpoint.ipv6())) {
ip = clientEndpoint.ipv6();
}
port = clientEndpoint.port();
}
}
if (serverEndpoint != null) {
if (StringUtils.isNotEmpty(serverEndpoint.ipv4())) {
ip = serverEndpoint.ipv4();
} else if (StringUtils.isNotEmpty(serverEndpoint.ipv6())) {
ip = serverEndpoint.ipv6();
}
}
if (StringUtil.isBlank(ip)) {
//The IP is the most important for building the ref at both sides.
return;
}
TraceSegmentReference.Builder refBuilder = TraceSegmentReference.newBuilder();
refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
int serviceId = rootSegment.getEntryServiceId();
if (serviceId == 0) {
refBuilder.setEntryServiceName(rootSegment.getEntryServiceName());
} else {
refBuilder.setEntryServiceId(serviceId);
}
refBuilder.setEntryApplicationInstanceId(rootSegment.builder().getApplicationInstanceId());
// parent ref info
refBuilder.setNetworkAddress(port == 0 ? ip : ip + ":" + port);
parentSegmentSpan.setPeer(refBuilder.getNetworkAddress());
refBuilder.setParentApplicationInstanceId(parentSegment.builder().getApplicationInstanceId());
refBuilder.setParentSpanId(parentSegmentSpan.getSpanId());
refBuilder.setParentTraceSegmentId(parentSegment.builder().getTraceSegmentId());
int parentServiceId = parentSegment.getEntryServiceId();
if (parentServiceId == 0) {
refBuilder.setParentServiceName(parentSegment.getEntryServiceName());
} else {
refBuilder.setParentServiceId(parentServiceId);
}
refBuilder.setRefType(RefType.CrossProcess);
spanBuilder.addRefs(refBuilder);
}
/**
* Context holds the values in build process.
*/
private class Context {
private LinkedList<Segment> segmentsStack = new LinkedList<>();
private boolean isAppChanged(String applicationCode) {
return StringUtils.isNotEmpty(applicationCode) && !applicationCode.equals(currentIDs().applicationCode);
}
private Segment addApp(String applicationCode,
RegisterServices registerServices) throws Exception {
int applicationId = waitForExchange(() ->
registerServices.getApplicationIDService().getOrCreateForApplicationCode(applicationCode),
10
);
int appInstanceId = waitForExchange(() ->
registerServices.getOrCreateApplicationInstanceId(applicationId, applicationCode),
10
);
Segment segment = new Segment(applicationCode, applicationId, appInstanceId);
segmentsStack.add(segment);
return segment;
}
private IDCollection currentIDs() {
return segmentsStack.getLast().ids;
}
private Segment currentSegment() {
return segmentsStack.getLast();
}
private Segment parentSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.get(segmentsStack.size() - 2);
}
}
private Segment rootSegment() {
if (segmentsStack.size() < 2) {
return null;
} else {
return segmentsStack.getFirst();
}
}
private Segment removeApp() {
return segmentsStack.removeLast();
}
private int waitForExchange(Callable<Integer> callable, int retry) throws Exception {
for (int i = 0; i < retry; i++) {
Integer id = callable.call();
if (id == 0) {
Thread.sleep(1000L);
} else {
return id;
}
}
throw new TimeoutException("ID exchange costs more than expected.");
}
}
private class Segment {
private TraceSegmentObject.Builder segmentBuilder;
private IDCollection ids;
private int entryServiceId = 0;
private String entryServiceName = null;
private List<SpanObject.Builder> spans;
private long endTime = 0;
private Segment(String applicationCode, int applicationId, int appInstanceId) {
ids = new IDCollection(applicationCode, applicationId, appInstanceId);
spans = new LinkedList<>();
segmentBuilder = TraceSegmentObject.newBuilder();
segmentBuilder.setApplicationId(applicationId);
segmentBuilder.setApplicationInstanceId(appInstanceId);
segmentBuilder.setTraceSegmentId(generateTraceOrSegmentId());
}
private TraceSegmentObject.Builder builder() {
return segmentBuilder;
}
private void addSpan(SpanObject.Builder spanBuilder) {
String operationName = spanBuilder.getOperationName();
if (entryServiceId == 0 && StringUtils.isNotEmpty(operationName)) {
if (SpanType.Entry == spanBuilder.getSpanType()) {
if (StringUtils.isNotEmpty(operationName)) {
entryServiceName = operationName;
} else {
entryServiceId = spanBuilder.getOperationNameId();
}
}
}
// init by root span
if (spanBuilder.getSpanId() == 1 && entryServiceId == 0) {
if (StringUtils.isNotEmpty(operationName)) {
entryServiceName = operationName;
} else {
entryServiceId = spanBuilder.getOperationNameId();
}
}
spans.add(spanBuilder);
if (spanBuilder.getEndTime() > endTime) {
endTime = spanBuilder.getEndTime();
}
}
public int getEntryServiceId() {
return entryServiceId;
}
public String getEntryServiceName() {
return entryServiceName;
}
private IDCollection ids() {
return ids;
}
public TraceSegmentObject.Builder freeze() {
for (SpanObject.Builder span : spans) {
segmentBuilder.addSpans(span);
}
return segmentBuilder;
}
public long getEndTime() {
return endTime;
}
}
private class IDCollection {
private String applicationCode;
private int appId;
private int instanceId;
private int spanIdSeq;
private IDCollection(String applicationCode, int appId, int instanceId) {
this.applicationCode = applicationCode;
this.appId = appId;
this.instanceId = instanceId;
this.spanIdSeq = 0;
}
private int nextSpanId() {
return spanIdSeq++;
}
}
private UniqueId generateTraceOrSegmentId() {
return UniqueId.newBuilder()
.addIdParts(ThreadLocalRandom.current().nextLong())
.addIdParts(ThreadLocalRandom.current().nextLong())
.addIdParts(ThreadLocalRandom.current().nextLong())
.build();
}
private class ClientSideSpan {
private Span span;
private SpanObject.Builder builder;
public ClientSideSpan(Span span, SpanObject.Builder builder) {
this.span = span;
this.builder = builder;
}
public Span getSpan() {
return span;
}
public SpanObject.Builder getBuilder() {
return builder;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
public interface SegmentListener {
void notify(SkyWalkingTrace trace);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace;
import zipkin2.Span;
import java.util.LinkedList;
import java.util.List;
/**
* @author wusheng
*/
public class Zipkin2SkyWalkingTransfer {
public static Zipkin2SkyWalkingTransfer INSTANCE = new Zipkin2SkyWalkingTransfer();
private RegisterServices registerServices;
private IInstanceHeartBeatService instanceHeartBeatService;
private List<SegmentListener> listeners = new LinkedList<>();
private Zipkin2SkyWalkingTransfer() {
}
public void setRegisterServices(
RegisterServices registerServices) {
this.registerServices = registerServices;
}
public void setInstanceHeartBeatService(IInstanceHeartBeatService instanceHeartBeatService) {
this.instanceHeartBeatService = instanceHeartBeatService;
}
public void addListener(SegmentListener listener) {
listeners.add(listener);
}
public void transfer(ZipkinTrace trace) throws Exception {
List<Span> traceSpans = trace.getSpans();
if (traceSpans.size() > 0) {
SkyWalkingTrace skyWalkingTrace = SegmentBuilder.build(traceSpans, registerServices, instanceHeartBeatService);
listeners.forEach(listener ->
listener.notify(skyWalkingTrace)
);
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
org.apache.skywalking.apm.collector.receiver.zipkin.provider.ZipkinReceiverProvider
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.receiver.zipkin.provider.transform;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.AgentOsInfo;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.RegisterServices;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.SkyWalkingTrace;
import org.apache.skywalking.apm.collector.receiver.zipkin.provider.data.ZipkinTrace;
import org.apache.skywalking.apm.network.proto.SpanObject;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.apache.skywalking.apm.network.proto.TraceSegmentObject;
import org.apache.skywalking.apm.network.proto.TraceSegmentReference;
import org.junit.Assert;
import org.junit.Test;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* @author wusheng
*/
public class SpringSleuthSegmentBuilderTest implements SegmentListener {
private Map<String, Integer> applicationInstRegister = new HashMap<>();
private Map<String, Integer> applicationRegister = new HashMap<>();
private int appIdSeg = 1;
private int appInstIdSeq = 1;
@Test
public void testTransform() throws Exception {
IApplicationIDService applicationIDService = new IApplicationIDService() {
@Override
public int getOrCreateForApplicationCode(String applicationCode) {
String key = "AppCode:" + applicationCode;
if (applicationRegister.containsKey(key)) {
return applicationRegister.get(key);
} else {
int id = appIdSeg++;
applicationRegister.put(key, id);
return id;
}
}
@Override
public int getOrCreateForAddressId(int addressId, String networkAddress) {
String key = "Address:" + networkAddress;
if (applicationRegister.containsKey(key)) {
return applicationRegister.get(key);
} else {
int id = appIdSeg++;
applicationRegister.put(key, id);
return id;
}
}
};
IInstanceIDService instanceIDService = new IInstanceIDService() {
@Override
public int getOrCreateByAgentUUID(int applicationId, String agentUUID, long registerTime, AgentOsInfo osInfo) {
String key = "AppCode:" + applicationId + ",UUID:" + agentUUID;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
int id = appInstIdSeq++;
applicationInstRegister.put(key, id);
return id;
}
}
@Override
public int getOrCreateByAddressId(int applicationId, int addressId, long registerTime) {
String key = "VitualAppCode:" + applicationId + ",address:" + addressId;
if (applicationInstRegister.containsKey(key)) {
return applicationInstRegister.get(key);
} else {
int id = appInstIdSeq++;
applicationInstRegister.put(key, id);
return id;
}
}
};
RegisterServices services = new RegisterServices(applicationIDService, instanceIDService, null, null);
Zipkin2SkyWalkingTransfer.INSTANCE.addListener(this);
Zipkin2SkyWalkingTransfer.INSTANCE.setRegisterServices(services);
Zipkin2SkyWalkingTransfer.INSTANCE.setInstanceHeartBeatService(new IInstanceHeartBeatService() {
@Override
public void heartBeat(int instanceId, long heartBeatTime) {
}
});
List<Span> spanList = buildSpringSleuthExampleTrace();
Assert.assertEquals(3, spanList.size());
ZipkinTrace trace = new ZipkinTrace();
spanList.forEach(span -> trace.addSpan(span));
Zipkin2SkyWalkingTransfer.INSTANCE.transfer(trace);
}
private List<Span> buildSpringSleuthExampleTrace() throws UnsupportedEncodingException {
List<Span> spans = new LinkedList<>();
String span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"id\":\"1a8a1b5bdd791b8a\",\"kind\":\"SERVER\",\"name\":\"get /\",\"timestamp\":1527669813700123,\"duration\":11295,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv6\":\"::1\",\"port\":55146},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/\",\"mvc.controller.class\":\"Frontend\",\"mvc.controller.method\":\"callBackend\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"CLIENT\",\"name\":\"get\",\"timestamp\":1527669813702456,\"duration\":6672,\"localEndpoint\":{\"serviceName\":\"frontend\",\"ipv4\":\"192.168.72.220\"},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\"}}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
span = "{\"traceId\":\"5b0e64354eea4fa71a8a1b5bdd791b8a\",\"parentId\":\"1a8a1b5bdd791b8a\",\"id\":\"d7d5b93dcda767c8\",\"kind\":\"SERVER\",\"name\":\"get /api\",\"timestamp\":1527669813705106,\"duration\":4802,\"localEndpoint\":{\"serviceName\":\"backend\",\"ipv4\":\"192.168.72.220\"},\"remoteEndpoint\":{\"ipv4\":\"127.0.0.1\",\"port\":55147},\"tags\":{\"http.method\":\"GET\",\"http.path\":\"/api\",\"mvc.controller.class\":\"Backend\",\"mvc.controller.method\":\"printDate\"},\"shared\":true}";
spans.add(SpanBytesDecoder.JSON_V2.decodeOne(span.getBytes("UTF-8")));
return SpanBytesDecoder.JSON_V2.decodeList(spans.toString().getBytes("UTF-8"));
}
@Override
public void notify(SkyWalkingTrace trace) {
List<TraceSegmentObject.Builder> segments = trace.getSegmentList();
Assert.assertEquals(2, segments.size());
TraceSegmentObject.Builder builder = segments.get(0);
TraceSegmentObject.Builder builder1 = segments.get(1);
TraceSegmentObject.Builder front, end;
if (builder.getApplicationId() == applicationRegister.get("AppCode:frontend")) {
front = builder;
end = builder1;
Assert.assertEquals(applicationRegister.get("AppCode:backend").longValue(), builder1.getApplicationId());
} else if (builder.getApplicationId() == applicationRegister.get("AppCode:backend")) {
end = builder;
front = builder1;
Assert.assertEquals(applicationRegister.get("AppCode:frontend").longValue(), builder1.getApplicationId());
} else {
Assert.fail("Can't find frontend and backend applications. ");
return;
}
Assert.assertEquals(2, front.getSpansCount());
Assert.assertEquals(1, end.getSpansCount());
front.getSpansList().forEach(spanObject -> {
if (spanObject.getSpanId() == 0) {
// span id = 1, means incoming http of frontend
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
Assert.assertEquals("get /", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
} else if (spanObject.getSpanId() == 1) {
Assert.assertEquals("192.168.72.220", spanObject.getPeer());
Assert.assertEquals(SpanType.Exit, spanObject.getSpanType());
Assert.assertEquals(1, spanObject.getSpanId());
Assert.assertEquals(0, spanObject.getParentSpanId());
} else {
Assert.fail("Only two spans expected");
}
Assert.assertTrue(spanObject.getTagsCount() > 0);
});
SpanObject spanObject = end.getSpans(0);
Assert.assertEquals(1, spanObject.getRefsCount());
TraceSegmentReference spanObjectRef = spanObject.getRefs(0);
Assert.assertEquals("get", spanObjectRef.getEntryServiceName());
Assert.assertEquals("get", spanObjectRef.getParentServiceName());
//Assert.assertEquals("192.168.72.220", spanObjectRef.getNetworkAddress());
Assert.assertEquals(1, spanObjectRef.getParentSpanId());
Assert.assertEquals(front.getTraceSegmentId(), spanObjectRef.getParentTraceSegmentId());
Assert.assertTrue(spanObject.getTagsCount() > 0);
Assert.assertEquals("get /api", spanObject.getOperationName());
Assert.assertEquals(0, spanObject.getSpanId());
Assert.assertEquals(-1, spanObject.getParentSpanId());
Assert.assertEquals(SpanType.Entry, spanObject.getSpanType());
}
}
......@@ -39,7 +39,7 @@ import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
import org.apache.skywalking.apm.collector.storage.ui.application.ApplicationNode;
import org.apache.skywalking.apm.collector.storage.ui.application.ConjecturalNode;
import org.apache.skywalking.apm.collector.storage.ui.common.VisualUserNode;
......@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GraphQLHandler extends JettyHandler {
public class GraphQLHandler extends JettyJsonHandler {
private static final Logger logger = LoggerFactory.getLogger(GraphQLHandler.class);
......
......@@ -24,12 +24,12 @@ import com.google.gson.JsonElement;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.apache.skywalking.apm.collector.server.jetty.JettyJsonHandler;
/**
* @author peng-yongsheng
*/
public class UIJettyNamingHandler extends JettyHandler {
public class UIJettyNamingHandler extends JettyJsonHandler {
private final UIJettyNamingListener namingListener;
......
......@@ -45,6 +45,7 @@
<module>apm-collector-configuration</module>
<module>apm-collector-agent</module>
<module>apm-collector-analysis</module>
<module>apm-collector-thirdparty-receiver</module>
</modules>
<properties>
......@@ -63,6 +64,7 @@
<jedis.version>2.9.0</jedis.version>
<zookeeper.version>3.4.10</zookeeper.version>
<elasticsearch.client.version>5.5.0</elasticsearch.client.version>
<zipkin.version>2.9.1</zipkin.version>
<shardingjdbc.version>2.0.3</shardingjdbc.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
</properties>
......@@ -248,6 +250,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
<version>${zipkin.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -289,6 +289,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
validation-api 1.1.0.Final: http://beanvalidation.org/licensing/, Apache 2.0
zuul-core 1.3.0: https://github.com/Netflix/zuul, Apache 2.0
ben-manes caffeine 2.6.2: https://github.com/ben-manes/caffeine, Apache 2.0
zipkin 2.9.1: https://github.com/openzipkin/zipkin, Apache 2.0
sharding-jdbc-core 2.0.3: https://github.com/sharding-sphere/sharding-sphere, Apache 2.0
========================================================================
......
......@@ -21,6 +21,7 @@
* Incubating Features
* [Why are some features in **Incubating**?](en/Incubating/Abstract.md)
* [Use Sharding JDBC as storage implementor](en/Use-ShardingJDBC-as-storage-implementor.md)
* [Receive Zipkin span data format](../apm-collector/apm-collector-thirdparty-receiver/receiver-zipkin/docs/README.md)
* Application Toolkit
* [Overview](en/Applicaton-toolkit.md)
* [Use SkyWalking OpenTracing compatible tracer](en/Opentracing.md)
......
......@@ -10,3 +10,4 @@ List some typical incubating features
1. New storage implementor in collector, such as: ElasticSearch HTTP, MySQL...
1. New module provided in collector.
1. New optional plugins in agent.
1. New features/services in collector, as the integration.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册