diff --git a/apm-protocol/apm-network/pom.xml b/apm-protocol/apm-network/pom.xml index 1a4c9975ca9fc8187e775bd10496f6791d51b9d9..935d638ddcb52eb518286f0ef4ad8a70b83af035 100644 --- a/apm-protocol/apm-network/pom.xml +++ b/apm-protocol/apm-network/pom.xml @@ -42,21 +42,25 @@ io.grpc grpc-netty ${grpc.version} + provided io.grpc grpc-protobuf ${grpc.version} + provided io.grpc grpc-stub ${grpc.version} + provided io.netty netty-tcnative-boringssl-static ${netty-tcnative-boringssl-static.version} + provided diff --git a/apm-sniffer/apm-agent-core/pom.xml b/apm-sniffer/apm-agent-core/pom.xml index 95721fdcfe5eef742cc2a483a251c0078bf00c9e..cc95d79bff4722c8e1d648a72a3df21aa5e6dd12 100644 --- a/apm-sniffer/apm-agent-core/pom.xml +++ b/apm-sniffer/apm-agent-core/pom.xml @@ -16,7 +16,8 @@ ~ --> - + 4.0.0 @@ -38,8 +39,8 @@ 20.0 1.9.2 3.3.6 - 4.5.3 2.6.0 + 2.0.7.Final 1.4.1.Final org.apache.skywalking.apm.dependencies com.lmax.disruptor @@ -56,7 +57,8 @@ org.apache.http ${shade.package}.${shade.org.apache.http.source} org.apache.commons - ${shade.package}.${shade.org.apache.http.source} + ${shade.package}.${shade.org.apache.http.source} + @@ -64,7 +66,6 @@ org.apache.skywalking apm-network ${project.version} - org.apache.skywalking @@ -76,21 +77,36 @@ byte-buddy ${bytebuddy.version} - - net.bytebuddy - byte-buddy-agent - ${bytebuddy.version} - test - com.lmax disruptor ${disruptor.version} - org.apache.httpcomponents - httpclient - ${apache-httpclient.version} + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-protobuf + ${grpc.version} + + + io.grpc + grpc-stub + ${grpc.version} + + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative-boringssl-static.version} + + + net.bytebuddy + byte-buddy-agent + ${bytebuddy.version} + test org.eclipse.jetty @@ -109,7 +125,6 @@ wiremock ${wiremock.version} test - io.grpc @@ -130,16 +145,15 @@ - - - com.google.guava - guava - ${guava.version} - - + + + com.google.guava + guava + ${guava.version} + + - kr.motd.maven @@ -185,10 +199,6 @@ ${shade.io.opencensus.source} ${shade.io.opencensus.target} - - ${shade.org.apache.http.source} - ${shade.org.apache.http.target} - ${shade.org.apache.commons.source} ${shade.org.apache.commons.target} @@ -204,7 +214,8 @@ - + @@ -220,7 +231,7 @@ - + @@ -229,14 +240,22 @@ package - - - - - - - - + + + + + + + + diff --git a/apm-sniffer/apm-agent/pom.xml b/apm-sniffer/apm-agent/pom.xml index 9094e3e80e5d3d676c7dbc1667bc85d7a8304a1f..dd63db85d7b1b8177ca989ce3046ca6b08131eb1 100644 --- a/apm-sniffer/apm-agent/pom.xml +++ b/apm-sniffer/apm-agent/pom.xml @@ -79,6 +79,7 @@ *:gson io.grpc:* io.netty:* + io.opencensus:* com.google.*:* com.google.guava:guava diff --git a/oap-server/pom.xml b/oap-server/pom.xml index 8652efab298714e6d5f39bd633d9475c0712ed48..a722022a419f4a639708492f9dc682a7adf46656 100644 --- a/oap-server/pom.xml +++ b/oap-server/pom.xml @@ -51,6 +51,7 @@ 8.0 3.4.10 1.10.0 + 2.0.7.Final 9.4.2.v20170220 1.18.0 1.4.196 @@ -98,6 +99,11 @@ + + org.apache.skywalking + apm-network + ${project.version} + org.apache.skywalking apm-util @@ -220,6 +226,11 @@ grpc-stub ${grpc.version} + + io.netty + netty-tcnative-boringssl-static + ${netty-tcnative-boringssl-static.version} + io.grpc grpc-testing diff --git a/oap-server/server-library/library-buffer/pom.xml b/oap-server/server-library/library-buffer/pom.xml index 8a87b1e41d2236866616e28da956eab045130498..852e31632eb0ecd4423e64c4502fdf53f27d34b4 100644 --- a/oap-server/server-library/library-buffer/pom.xml +++ b/oap-server/server-library/library-buffer/pom.xml @@ -38,7 +38,22 @@ org.apache.skywalking apm-network - ${project.version} + + + io.grpc + grpc-netty + + + io.grpc + grpc-protobuf + + + io.grpc + grpc-stub + + + io.netty + netty-tcnative-boringssl-static \ No newline at end of file diff --git a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java index c42fce1af236de51c8b40f33af9e296e82204298..7c9e749bcd9dbdf6340c5118f6870ef76ae7f097 100644 --- a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java +++ b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java @@ -18,12 +18,16 @@ package org.apache.skywalking.aop.server.receiver.mesh; +import java.util.Objects; import org.apache.logging.log4j.util.Strings; import org.apache.skywalking.apm.network.servicemesh.Protocol; import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; import org.apache.skywalking.oap.server.core.source.All; import org.apache.skywalking.oap.server.core.source.DetectPoint; import org.apache.skywalking.oap.server.core.source.Endpoint; @@ -35,6 +39,8 @@ import org.apache.skywalking.oap.server.core.source.ServiceRelation; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format telemetry data, transfers it to source @@ -43,10 +49,14 @@ import org.apache.skywalking.oap.server.library.util.TimeBucketUtils; * @author wusheng */ public class TelemetryDataDispatcher { + private static final Logger logger = LoggerFactory.getLogger(TelemetryDataDispatcher.class); + private static MeshDataBufferFileCache CACHE; private static ServiceInventoryCache SERVICE_CACHE; private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE; private static SourceReceiver SOURCE_RECEIVER; + private static IServiceInstanceInventoryRegister SERVICE_INSTANCE_INVENTORY_REGISTER; + private static IServiceInventoryRegister SERVICE_INVENTORY_REGISTER; private TelemetryDataDispatcher() { @@ -57,6 +67,8 @@ public class TelemetryDataDispatcher { SERVICE_CACHE = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class); SERVICE_INSTANCE_CACHE = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class); SOURCE_RECEIVER = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class); + SERVICE_INSTANCE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class); + SERVICE_INVENTORY_REGISTER = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class); } public static void preProcess(ServiceMeshMetric data) { @@ -77,6 +89,7 @@ public class TelemetryDataDispatcher { ServiceMeshMetric metric = decorator.getMetric(); long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(metric.getStartTime()); + heartbeat(decorator, minuteTimeBucket); if (org.apache.skywalking.apm.network.common.DetectPoint.server.equals(metric.getDetectPoint())) { toAll(decorator, minuteTimeBucket); toService(decorator, minuteTimeBucket); @@ -87,6 +100,30 @@ public class TelemetryDataDispatcher { toServiceInstanceRelation(decorator, minuteTimeBucket); } + private static void heartbeat(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) { + ServiceMeshMetric metric = decorator.getMetric(); + + // source + SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(), metric.getEndTime()); + int instanceId = metric.getSourceServiceInstanceId(); + ServiceInstanceInventory serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } else { + logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); + } + + // dest + SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(), metric.getEndTime()); + instanceId = metric.getDestServiceInstanceId(); + serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(), metric.getEndTime()); + } else { + logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); + } + } + private static void toAll(ServiceMeshMetricDataDecorator decorator, long minuteTimeBucket) { ServiceMeshMetric metric = decorator.getMetric(); All all = new All(); @@ -143,7 +180,7 @@ public class TelemetryDataDispatcher { ServiceMeshMetric metric = decorator.getMetric(); ServiceInstance serviceInstance = new ServiceInstance(); serviceInstance.setTimeBucket(minuteTimeBucket); - serviceInstance.setId(metric.getDestServiceId()); + serviceInstance.setId(metric.getDestServiceInstanceId()); serviceInstance.setName(getServiceInstanceName(metric.getDestServiceInstanceId(), metric.getDestServiceInstance())); serviceInstance.setServiceId(metric.getDestServiceId()); serviceInstance.setServiceName(getServiceName(metric.getDestServiceId(), metric.getDestServiceName())); diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java index 378162e9b7a5d075b5e0cb8dbfe6d9bfd0897c4f..5da0182126697ef58f364bcbab75e8e5b26cd223 100644 --- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java @@ -88,7 +88,7 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp if (Objects.nonNull(serviceInstanceInventory)) { serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); } else { - logger.warn("Can't found service instance by service instance id from cache, service instance id is: {}", serviceInstanceId); + logger.warn("Can't found service by service instance id from cache, service instance id is: {}", serviceInstanceId); } responseObserver.onNext(Downstream.getDefaultInstance()); diff --git a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java index 1c112c74c2b65d84f28946b8a56a5ce151a0ad26..df2a7fe45b68cd6362cbf40556e2628de8add862 100644 --- a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java +++ b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java @@ -20,9 +20,13 @@ package org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.r import com.google.gson.*; import java.io.IOException; +import java.util.Objects; import javax.servlet.http.HttpServletRequest; import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; +import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory; import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.library.server.jetty.*; import org.slf4j.*; @@ -35,6 +39,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler { private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class); private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister; + private final ServiceInstanceInventoryCache serviceInstanceInventoryCache; + private final IServiceInventoryRegister serviceInventoryRegister; private final Gson gson = new Gson(); private static final String INSTANCE_ID = "ii"; @@ -42,6 +48,8 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler { public InstanceHeartBeatServletHandler(ModuleManager moduleManager) { this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class); + this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class); + this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class); } @Override public String pathSpec() { @@ -60,6 +68,12 @@ public class InstanceHeartBeatServletHandler extends JettyJsonHandler { long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong(); serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime); + ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId); + if (Objects.nonNull(serviceInstanceInventory)) { + serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime); + } else { + logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId); + } } catch (IOException e) { logger.error(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java index ab8c7277279ab94e1a564909a31f4308219d0642..442381a72248299120c57f51b0d23668d8af4029 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java @@ -61,7 +61,6 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp, endTimestamp)); boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT, DetectPoint.SERVER.ordinal()));