From 800a5353def28b37b7580f0c32de87534ae56f22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com> Date: Fri, 7 Sep 2018 11:59:55 +0800 Subject: [PATCH] Add generate indicator and dispatcher about service topology but not test. (#1638) * Add generate indicator and dispatcher about service topology but not test. * Delete call type from service relation. --- .../core/analysis/DispatcherManager.java | 4 + .../service/ServiceCallsSumIndicator.java | 119 ++++++++++++++++ .../generated/service/ServiceDispatcher.java | 12 +- .../ServiceRelationAvgIndicator.java | 9 +- ...erviceRelationClientCallsSumIndicator.java | 128 ++++++++++++++++++ .../ServiceRelationDispatcher.java | 29 ++++ ...erviceRelationServerCallsSumIndicator.java | 128 ++++++++++++++++++ .../core/analysis/indicator/SumIndicator.java | 51 +++++++ .../service/ServiceComponentDispatcher.java | 42 ++++++ .../service/ServiceComponentIndicator.java | 122 +++++++++++++++++ .../service/ServiceMappingDispatcher.java | 42 ++++++ .../service/ServiceMappingIndicator.java | 122 +++++++++++++++++ .../oap/server/core/source/Scope.java | 3 +- .../server/core/source/ServiceComponent.java | 36 +++++ .../server/core/source/ServiceMapping.java | 36 +++++ .../src/main/resources/query-protocol | 2 +- .../trace/provider/TraceModuleProvider.java | 3 + .../service/ServiceComponentSpanListener.java | 77 +++++++++++ .../service/ServiceMappingSpanListener.java | 86 ++++++++++++ 19 files changed, 1043 insertions(+), 8 deletions(-) create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceCallsSumIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationClientCallsSumIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationServerCallsSumIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentDispatcher.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingDispatcher.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceComponent.java create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceMapping.java create mode 100644 oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceComponentSpanListener.java create mode 100644 oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index 0d0b4c01b6..6f059c70bf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher; import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher; import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher; +import org.apache.skywalking.oap.server.core.analysis.manual.service.*; import org.apache.skywalking.oap.server.core.source.Scope; import org.slf4j.*; @@ -48,6 +49,9 @@ public class DispatcherManager { this.dispatcherMap.put(Scope.ServiceInstance, new ServiceInstanceDispatcher()); this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher()); + this.dispatcherMap.put(Scope.ServiceComponent, new ServiceComponentDispatcher()); + this.dispatcherMap.put(Scope.ServiceMapping, new ServiceMappingDispatcher()); + this.dispatcherMap.put(Scope.ServiceRelation, new ServiceRelationDispatcher()); this.dispatcherMap.put(Scope.ServiceInstanceRelation, new ServiceInstanceRelationDispatcher()); this.dispatcherMap.put(Scope.EndpointRelation, new EndpointRelationDispatcher()); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceCallsSumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceCallsSumIndicator.java new file mode 100644 index 0000000000..202b8c8fc5 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceCallsSumIndicator.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.generated.service; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * This class is auto generated. Please don't change this class manually. + * + * @author Observability Analysis Language code generator + */ +@IndicatorType +@StreamData +@StorageEntity(name = "service_calls_sum", builder = ServiceCallsSumIndicator.Builder.class) +public class ServiceCallsSumIndicator extends SumIndicator implements AlarmSupported { + + @Setter @Getter @Column(columnName = "id") private int id; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(id); + return splitJointId; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + id; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceCallsSumIndicator indicator = (ServiceCallsSumIndicator)obj; + if (id != indicator.id) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataLongs(0, getValue()); + remoteBuilder.setDataLongs(1, getTimeBucket()); + + + remoteBuilder.setDataIntegers(0, getId()); + + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + + setValue(remoteData.getDataLongs(0)); + setTimeBucket(remoteData.getDataLongs(1)); + + + setId(remoteData.getDataIntegers(0)); + } + + @Override public AlarmMeta getAlarmMeta() { + return new AlarmMeta("Service_Calls_Sum", Scope.Service, id); + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ServiceCallsSumIndicator storageData) { + Map map = new HashMap<>(); + map.put("id", storageData.getId()); + map.put("value", storageData.getValue()); + map.put("time_bucket", storageData.getTimeBucket()); + return map; + } + + @Override public ServiceCallsSumIndicator map2Data(Map dbMap) { + ServiceCallsSumIndicator indicator = new ServiceCallsSumIndicator(); + indicator.setId(((Number)dbMap.get("id")).intValue()); + indicator.setValue(((Number)dbMap.get("value")).longValue()); + indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue()); + return indicator; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceDispatcher.java index cdef6a8e78..80cb30d40f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/service/ServiceDispatcher.java @@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service; import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess; -import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.core.source.Service; /** * This class is auto generated. Please don't change this class manually. @@ -31,6 +31,7 @@ public class ServiceDispatcher implements SourceDispatcher { @Override public void dispatch(Service source) { doServiceAvg(source); + doServiceCallsSum(source); } private void doServiceAvg(Service source) { @@ -42,4 +43,13 @@ public class ServiceDispatcher implements SourceDispatcher { indicator.combine(source.getLatency(), 1); IndicatorProcess.INSTANCE.in(indicator); } + private void doServiceCallsSum(Service source) { + ServiceCallsSumIndicator indicator = new ServiceCallsSumIndicator(); + + + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setId(source.getId()); + indicator.combine(1); + IndicatorProcess.INSTANCE.in(indicator); + } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationAvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationAvgIndicator.java index a5e1c366fb..71be3fc728 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationAvgIndicator.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationAvgIndicator.java @@ -21,15 +21,14 @@ package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation import java.util.*; import lombok.*; import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.alarm.AlarmMeta; -import org.apache.skywalking.oap.server.core.alarm.AlarmSupported; -import org.apache.skywalking.oap.server.core.analysis.indicator.*; +import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.LongAvgIndicator; import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; -import org.apache.skywalking.oap.server.core.storage.annotation.*; -import org.apache.skywalking.oap.server.core.storage.StorageBuilder; import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; /** * This class is auto generated. Please don't change this class manually. diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationClientCallsSumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationClientCallsSumIndicator.java new file mode 100644 index 0000000000..0d1f03aa68 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationClientCallsSumIndicator.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * This class is auto generated. Please don't change this class manually. + * + * @author Observability Analysis Language code generator + */ +@IndicatorType +@StreamData +@StorageEntity(name = "service_relation_client_calls_sum", builder = ServiceRelationClientCallsSumIndicator.Builder.class) +public class ServiceRelationClientCallsSumIndicator extends SumIndicator implements AlarmSupported { + + @Setter @Getter @Column(columnName = "source_service_id") private int sourceServiceId; + @Setter @Getter @Column(columnName = "dest_service_id") private int destServiceId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId); + splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId); + return splitJointId; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + sourceServiceId; + result = 31 * result + destServiceId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceRelationClientCallsSumIndicator indicator = (ServiceRelationClientCallsSumIndicator)obj; + if (sourceServiceId != indicator.sourceServiceId) + return false; + if (destServiceId != indicator.destServiceId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataLongs(0, getValue()); + remoteBuilder.setDataLongs(1, getTimeBucket()); + + + remoteBuilder.setDataIntegers(0, getSourceServiceId()); + remoteBuilder.setDataIntegers(1, getDestServiceId()); + + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + + setValue(remoteData.getDataLongs(0)); + setTimeBucket(remoteData.getDataLongs(1)); + + + setSourceServiceId(remoteData.getDataIntegers(0)); + setDestServiceId(remoteData.getDataIntegers(1)); + } + + @Override public AlarmMeta getAlarmMeta() { + return new AlarmMeta("Service_Relation_Client_Calls_Sum", Scope.ServiceRelation, sourceServiceId, destServiceId); + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ServiceRelationClientCallsSumIndicator storageData) { + Map map = new HashMap<>(); + map.put("source_service_id", storageData.getSourceServiceId()); + map.put("dest_service_id", storageData.getDestServiceId()); + map.put("value", storageData.getValue()); + map.put("time_bucket", storageData.getTimeBucket()); + return map; + } + + @Override public ServiceRelationClientCallsSumIndicator map2Data(Map dbMap) { + ServiceRelationClientCallsSumIndicator indicator = new ServiceRelationClientCallsSumIndicator(); + indicator.setSourceServiceId(((Number)dbMap.get("source_service_id")).intValue()); + indicator.setDestServiceId(((Number)dbMap.get("dest_service_id")).intValue()); + indicator.setValue(((Number)dbMap.get("value")).longValue()); + indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue()); + return indicator; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationDispatcher.java index 1c7a52c80d..dcad4d3c4d 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationDispatcher.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationDispatcher.java @@ -19,6 +19,7 @@ package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation; import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.indicator.expression.EqualMatch; import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess; import org.apache.skywalking.oap.server.core.source.*; @@ -30,9 +31,37 @@ import org.apache.skywalking.oap.server.core.source.*; public class ServiceRelationDispatcher implements SourceDispatcher { @Override public void dispatch(ServiceRelation source) { + doServiceRelationClientCallsSum(source); + doServiceRelationServerCallsSum(source); doServiceRelationAvg(source); } + private void doServiceRelationClientCallsSum(ServiceRelation source) { + ServiceRelationClientCallsSumIndicator indicator = new ServiceRelationClientCallsSumIndicator(); + + if (!new EqualMatch().setLeft(source.getDetectPoint()).setRight(DetectPoint.CLIENT).match()) { + return; + } + + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setSourceServiceId(source.getSourceServiceId()); + indicator.setDestServiceId(source.getDestServiceId()); + indicator.combine(1); + IndicatorProcess.INSTANCE.in(indicator); + } + private void doServiceRelationServerCallsSum(ServiceRelation source) { + ServiceRelationServerCallsSumIndicator indicator = new ServiceRelationServerCallsSumIndicator(); + + if (!new EqualMatch().setLeft(source.getDetectPoint()).setRight(DetectPoint.SERVER).match()) { + return; + } + + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setSourceServiceId(source.getSourceServiceId()); + indicator.setDestServiceId(source.getDestServiceId()); + indicator.combine(1); + IndicatorProcess.INSTANCE.in(indicator); + } private void doServiceRelationAvg(ServiceRelation source) { ServiceRelationAvgIndicator indicator = new ServiceRelationAvgIndicator(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationServerCallsSumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationServerCallsSumIndicator.java new file mode 100644 index 0000000000..ced9e3d627 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/generated/servicerelation/ServiceRelationServerCallsSumIndicator.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.alarm.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * This class is auto generated. Please don't change this class manually. + * + * @author Observability Analysis Language code generator + */ +@IndicatorType +@StreamData +@StorageEntity(name = "service_relation_server_calls_sum", builder = ServiceRelationServerCallsSumIndicator.Builder.class) +public class ServiceRelationServerCallsSumIndicator extends SumIndicator implements AlarmSupported { + + @Setter @Getter @Column(columnName = "source_service_id") private int sourceServiceId; + @Setter @Getter @Column(columnName = "dest_service_id") private int destServiceId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId); + splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId); + return splitJointId; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + sourceServiceId; + result = 31 * result + destServiceId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceRelationServerCallsSumIndicator indicator = (ServiceRelationServerCallsSumIndicator)obj; + if (sourceServiceId != indicator.sourceServiceId) + return false; + if (destServiceId != indicator.destServiceId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataLongs(0, getValue()); + remoteBuilder.setDataLongs(1, getTimeBucket()); + + + remoteBuilder.setDataIntegers(0, getSourceServiceId()); + remoteBuilder.setDataIntegers(1, getDestServiceId()); + + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + + setValue(remoteData.getDataLongs(0)); + setTimeBucket(remoteData.getDataLongs(1)); + + + setSourceServiceId(remoteData.getDataIntegers(0)); + setDestServiceId(remoteData.getDataIntegers(1)); + } + + @Override public AlarmMeta getAlarmMeta() { + return new AlarmMeta("Service_Relation_Server_Calls_Sum", Scope.ServiceRelation, sourceServiceId, destServiceId); + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ServiceRelationServerCallsSumIndicator storageData) { + Map map = new HashMap<>(); + map.put("source_service_id", storageData.getSourceServiceId()); + map.put("dest_service_id", storageData.getDestServiceId()); + map.put("value", storageData.getValue()); + map.put("time_bucket", storageData.getTimeBucket()); + return map; + } + + @Override public ServiceRelationServerCallsSumIndicator map2Data(Map dbMap) { + ServiceRelationServerCallsSumIndicator indicator = new ServiceRelationServerCallsSumIndicator(); + indicator.setSourceServiceId(((Number)dbMap.get("source_service_id")).intValue()); + indicator.setDestServiceId(((Number)dbMap.get("dest_service_id")).intValue()); + indicator.setValue(((Number)dbMap.get("value")).longValue()); + indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue()); + return indicator; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java new file mode 100644 index 0000000000..0883dc1958 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/SumIndicator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.indicator; + +import lombok.*; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*; +import org.apache.skywalking.oap.server.core.storage.annotation.Column; + +/** + * @author peng-yongsheng + */ +@IndicatorOperator +public abstract class SumIndicator extends Indicator implements LongValueHolder { + + protected static final String VALUE = "value"; + + @Getter @Setter @Column(columnName = VALUE) private long value; + + @Entrance + public final void combine(@ConstOne long count) { + this.value += count; + } + + @Override public final void combine(Indicator indicator) { + SumIndicator sumIndicator = (SumIndicator)indicator; + combine(sumIndicator.value); + } + + @Override public void calculate() { + } + + @Override public long getValue() { + return value; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentDispatcher.java new file mode 100644 index 0000000000..9b29042704 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentDispatcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.service; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess; +import org.apache.skywalking.oap.server.core.source.ServiceComponent; + +/** + * @author peng-yongsheng + */ +public class ServiceComponentDispatcher implements SourceDispatcher { + + @Override public void dispatch(ServiceComponent source) { + doDispatch(source); + } + + private void doDispatch(ServiceComponent source) { + ServiceComponentIndicator indicator = new ServiceComponentIndicator(); + + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setServiceId(source.getServiceId()); + indicator.setComponentId(source.getComponentId()); + IndicatorProcess.INSTANCE.in(indicator); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java new file mode 100644 index 0000000000..406943cb61 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceComponentIndicator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.service; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * @author peng-yongsheng + */ +@IndicatorType +@StreamData +@StorageEntity(name = "service_component", builder = ServiceComponentIndicator.Builder.class) +public class ServiceComponentIndicator extends Indicator { + + private static final String SERVICE_ID = "service_id"; + private static final String COMPONENT_ID = "component_id"; + + @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId; + @Setter @Getter @Column(columnName = COMPONENT_ID) private int componentId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(serviceId); + splitJointId += Const.ID_SPLIT + String.valueOf(componentId); + return splitJointId; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + serviceId; + result = 31 * result + componentId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceComponentIndicator indicator = (ServiceComponentIndicator)obj; + if (serviceId != indicator.serviceId) + return false; + if (componentId != indicator.componentId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataLongs(0, getTimeBucket()); + + remoteBuilder.setDataIntegers(0, getServiceId()); + remoteBuilder.setDataIntegers(1, getComponentId()); + + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + setTimeBucket(remoteData.getDataLongs(0)); + + setServiceId(remoteData.getDataIntegers(0)); + setComponentId(remoteData.getDataIntegers(1)); + } + + @Override public void calculate() { + } + + @Override public final void combine(Indicator indicator) { + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ServiceComponentIndicator storageData) { + Map map = new HashMap<>(); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(COMPONENT_ID, storageData.getComponentId()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + + @Override public ServiceComponentIndicator map2Data(Map dbMap) { + ServiceComponentIndicator indicator = new ServiceComponentIndicator(); + indicator.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + indicator.setComponentId(((Number)dbMap.get(COMPONENT_ID)).intValue()); + indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return indicator; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingDispatcher.java new file mode 100644 index 0000000000..7efb503c4d --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingDispatcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.service; + +import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher; +import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess; +import org.apache.skywalking.oap.server.core.source.ServiceMapping; + +/** + * @author peng-yongsheng + */ +public class ServiceMappingDispatcher implements SourceDispatcher { + + @Override public void dispatch(ServiceMapping source) { + doDispatch(source); + } + + private void doDispatch(ServiceMapping source) { + ServiceMappingIndicator indicator = new ServiceMappingIndicator(); + + indicator.setTimeBucket(source.getTimeBucket()); + indicator.setServiceId(source.getServiceId()); + indicator.setMappingServiceId(source.getMappingServiceId()); + IndicatorProcess.INSTANCE.in(indicator); + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java new file mode 100644 index 0000000000..808fcaedc4 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/service/ServiceMappingIndicator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.analysis.manual.service; + +import java.util.*; +import lombok.*; +import org.apache.skywalking.oap.server.core.Const; +import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator; +import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamData; +import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData; +import org.apache.skywalking.oap.server.core.storage.StorageBuilder; +import org.apache.skywalking.oap.server.core.storage.annotation.*; + +/** + * @author peng-yongsheng + */ +@IndicatorType +@StreamData +@StorageEntity(name = "service_mapping", builder = ServiceMappingIndicator.Builder.class) +public class ServiceMappingIndicator extends Indicator { + + private static final String SERVICE_ID = "service_id"; + private static final String MAPPING_SERVICE_ID = "mapping_service_id"; + + @Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId; + @Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId; + + @Override public String id() { + String splitJointId = String.valueOf(getTimeBucket()); + splitJointId += Const.ID_SPLIT + String.valueOf(serviceId); + splitJointId += Const.ID_SPLIT + String.valueOf(mappingServiceId); + return splitJointId; + } + + @Override public int hashCode() { + int result = 17; + result = 31 * result + serviceId; + result = 31 * result + mappingServiceId; + result = 31 * result + (int)getTimeBucket(); + return result; + } + + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + + ServiceMappingIndicator indicator = (ServiceMappingIndicator)obj; + if (serviceId != indicator.serviceId) + return false; + if (mappingServiceId != indicator.mappingServiceId) + return false; + + if (getTimeBucket() != indicator.getTimeBucket()) + return false; + + return true; + } + + @Override public RemoteData.Builder serialize() { + RemoteData.Builder remoteBuilder = RemoteData.newBuilder(); + + remoteBuilder.setDataLongs(0, getTimeBucket()); + + remoteBuilder.setDataIntegers(0, getServiceId()); + remoteBuilder.setDataIntegers(1, getMappingServiceId()); + + return remoteBuilder; + } + + @Override public void deserialize(RemoteData remoteData) { + setTimeBucket(remoteData.getDataLongs(0)); + + setServiceId(remoteData.getDataIntegers(0)); + setMappingServiceId(remoteData.getDataIntegers(1)); + } + + @Override public void calculate() { + } + + @Override public final void combine(Indicator indicator) { + } + + public static class Builder implements StorageBuilder { + + @Override public Map data2Map(ServiceMappingIndicator storageData) { + Map map = new HashMap<>(); + map.put(SERVICE_ID, storageData.getServiceId()); + map.put(MAPPING_SERVICE_ID, storageData.getMappingServiceId()); + map.put(TIME_BUCKET, storageData.getTimeBucket()); + return map; + } + + @Override public ServiceMappingIndicator map2Data(Map dbMap) { + ServiceMappingIndicator indicator = new ServiceMappingIndicator(); + indicator.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue()); + indicator.setMappingServiceId(((Number)dbMap.get(MAPPING_SERVICE_ID)).intValue()); + indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue()); + return indicator; + } + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java index 43c1ef30e8..396ecabdc9 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java @@ -23,5 +23,6 @@ package org.apache.skywalking.oap.server.core.source; */ public enum Scope { All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress, - ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC + ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC, + ServiceComponent, ServiceMapping } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceComponent.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceComponent.java new file mode 100644 index 0000000000..338dc6594f --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceComponent.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import lombok.*; +import org.apache.skywalking.oap.server.core.source.annotation.SourceType; + +/** + * @author peng-yongsheng + */ +@SourceType +public class ServiceComponent extends Source { + + @Override public Scope scope() { + return Scope.ServiceComponent; + } + + @Getter @Setter private int serviceId; + @Getter @Setter private int componentId; +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceMapping.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceMapping.java new file mode 100644 index 0000000000..e3257817c2 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/ServiceMapping.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.source; + +import lombok.*; +import org.apache.skywalking.oap.server.core.source.annotation.SourceType; + +/** + * @author peng-yongsheng + */ +@SourceType +public class ServiceMapping extends Source { + + @Override public Scope scope() { + return Scope.ServiceMapping; + } + + @Getter @Setter private int serviceId; + @Getter @Setter private int mappingServiceId; +} diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol index 149251420c..c6ee5d15f3 160000 --- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol +++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol @@ -1 +1 @@ -Subproject commit 149251420c887552df93d5c76f67ccdd47cb71ce +Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java index c40bdcf370..3adfd1ff82 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule; import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager; import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*; /** * @author peng-yongsheng @@ -50,6 +51,8 @@ public class TraceModuleProvider extends ModuleProvider { @Override public void start() throws ModuleStartException { SegmentParserListenerManager listenerManager = new SegmentParserListenerManager(); listenerManager.add(new MultiScopesSpanListener.Factory()); + listenerManager.add(new ServiceComponentSpanListener.Factory()); + listenerManager.add(new ServiceMappingSpanListener.Factory()); GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class); try { diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceComponentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceComponentSpanListener.java new file mode 100644 index 0000000000..cf296e5fd0 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceComponentSpanListener.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service; + +import java.util.*; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; + +/** + * @author peng-yongsheng + */ +public class ServiceComponentSpanListener implements EntrySpanListener, ExitSpanListener { + + private final SourceReceiver sourceReceiver; + private final ServiceInventoryCache serviceInventoryCache; + private final List serviceComponents = new LinkedList<>(); + + private ServiceComponentSpanListener(ModuleManager moduleManager) { + this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class); + this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class); + } + + @Override public boolean containsPoint(Point point) { + return Point.Entry.equals(point) || Point.Exit.equals(point); + } + + @Override + public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + int serviceIdByPeerId = serviceInventoryCache.getServiceId(spanDecorator.getPeerId()); + + ServiceComponent serviceComponent = new ServiceComponent(); + serviceComponent.setServiceId(serviceIdByPeerId); + serviceComponent.setComponentId(spanDecorator.getComponentId()); + serviceComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket()); + serviceComponents.add(serviceComponent); + } + + @Override + public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + ServiceComponent serviceComponent = new ServiceComponent(); + serviceComponent.setServiceId(segmentCoreInfo.getApplicationId()); + serviceComponent.setComponentId(spanDecorator.getComponentId()); + serviceComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket()); + serviceComponents.add(serviceComponent); + } + + @Override public void build() { + serviceComponents.forEach(sourceReceiver::receive); + } + + public static class Factory implements SpanListenerFactory { + + @Override public SpanListener create(ModuleManager moduleManager) { + return new ServiceComponentSpanListener(moduleManager); + } + } +} diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java new file mode 100644 index 0000000000..a9bc5a5dd8 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/service/ServiceMappingSpanListener.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service; + +import java.util.*; +import org.apache.skywalking.apm.network.language.agent.SpanLayer; +import org.apache.skywalking.oap.server.core.CoreModule; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*; +import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*; +import org.slf4j.*; + +/** + * @author peng-yongsheng + */ +public class ServiceMappingSpanListener implements EntrySpanListener { + + private static final Logger logger = LoggerFactory.getLogger(ServiceMappingSpanListener.class); + + private final SourceReceiver sourceReceiver; + private final ServiceInventoryCache serviceInventoryCache; + private List serviceMappings = new LinkedList<>(); + + private ServiceMappingSpanListener(ModuleManager moduleManager) { + this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class); + this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class); + } + + @Override public boolean containsPoint(Point point) { + return Point.Entry.equals(point); + } + + @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) { + if (logger.isDebugEnabled()) { + logger.debug("service mapping listener parse reference"); + } + + if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) { + if (spanDecorator.getRefsCount() > 0) { + for (int i = 0; i < spanDecorator.getRefsCount(); i++) { + ServiceMapping serviceMapping = new ServiceMapping(); + serviceMapping.setServiceId(segmentCoreInfo.getApplicationId()); + + int addressId = spanDecorator.getRefs(i).getNetworkAddressId(); + int mappingServiceId = serviceInventoryCache.getServiceId(addressId); + serviceMapping.setMappingServiceId(mappingServiceId); + serviceMapping.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket()); + serviceMappings.add(serviceMapping); + } + } + } + } + + @Override public void build() { + if (logger.isDebugEnabled()) { + logger.debug("service mapping listener build"); + } + + serviceMappings.forEach(sourceReceiver::receive); + } + + public static class Factory implements SpanListenerFactory { + + @Override public SpanListener create(ModuleManager moduleManager) { + return new ServiceMappingSpanListener(moduleManager); + } + } +} -- GitLab