提交 800a5353 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Add generate indicator and dispatcher about service topology but not test. (#1638)

* Add generate indicator and dispatcher about service topology but not test.

* Delete call type from service relation.
上级 f0f4cde4
......@@ -29,6 +29,7 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.slf4j.*;
......@@ -48,6 +49,9 @@ public class DispatcherManager {
this.dispatcherMap.put(Scope.ServiceInstance, new ServiceInstanceDispatcher());
this.dispatcherMap.put(Scope.Endpoint, new EndpointDispatcher());
this.dispatcherMap.put(Scope.ServiceComponent, new ServiceComponentDispatcher());
this.dispatcherMap.put(Scope.ServiceMapping, new ServiceMappingDispatcher());
this.dispatcherMap.put(Scope.ServiceRelation, new ServiceRelationDispatcher());
this.dispatcherMap.put(Scope.ServiceInstanceRelation, new ServiceInstanceRelationDispatcher());
this.dispatcherMap.put(Scope.EndpointRelation, new EndpointRelationDispatcher());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.generated.service;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@IndicatorType
@StreamData
@StorageEntity(name = "service_calls_sum", builder = ServiceCallsSumIndicator.Builder.class)
public class ServiceCallsSumIndicator extends SumIndicator implements AlarmSupported {
@Setter @Getter @Column(columnName = "id") private int id;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(id);
return splitJointId;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + id;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceCallsSumIndicator indicator = (ServiceCallsSumIndicator)obj;
if (id != indicator.id)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataLongs(0, getValue());
remoteBuilder.setDataLongs(1, getTimeBucket());
remoteBuilder.setDataIntegers(0, getId());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setValue(remoteData.getDataLongs(0));
setTimeBucket(remoteData.getDataLongs(1));
setId(remoteData.getDataIntegers(0));
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("Service_Calls_Sum", Scope.Service, id);
}
public static class Builder implements StorageBuilder<ServiceCallsSumIndicator> {
@Override public Map<String, Object> data2Map(ServiceCallsSumIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put("id", storageData.getId());
map.put("value", storageData.getValue());
map.put("time_bucket", storageData.getTimeBucket());
return map;
}
@Override public ServiceCallsSumIndicator map2Data(Map<String, Object> dbMap) {
ServiceCallsSumIndicator indicator = new ServiceCallsSumIndicator();
indicator.setId(((Number)dbMap.get("id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).longValue());
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
}
}
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.source.Service;
/**
* This class is auto generated. Please don't change this class manually.
......@@ -31,6 +31,7 @@ public class ServiceDispatcher implements SourceDispatcher<Service> {
@Override public void dispatch(Service source) {
doServiceAvg(source);
doServiceCallsSum(source);
}
private void doServiceAvg(Service source) {
......@@ -42,4 +43,13 @@ public class ServiceDispatcher implements SourceDispatcher<Service> {
indicator.combine(source.getLatency(), 1);
IndicatorProcess.INSTANCE.in(indicator);
}
private void doServiceCallsSum(Service source) {
ServiceCallsSumIndicator indicator = new ServiceCallsSumIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setId(source.getId());
indicator.combine(1);
IndicatorProcess.INSTANCE.in(indicator);
}
}
......@@ -21,15 +21,14 @@ package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.AlarmMeta;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.LongAvgIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@IndicatorType
@StreamData
@StorageEntity(name = "service_relation_client_calls_sum", builder = ServiceRelationClientCallsSumIndicator.Builder.class)
public class ServiceRelationClientCallsSumIndicator extends SumIndicator implements AlarmSupported {
@Setter @Getter @Column(columnName = "source_service_id") private int sourceServiceId;
@Setter @Getter @Column(columnName = "dest_service_id") private int destServiceId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
return splitJointId;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceServiceId;
result = 31 * result + destServiceId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceRelationClientCallsSumIndicator indicator = (ServiceRelationClientCallsSumIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataLongs(0, getValue());
remoteBuilder.setDataLongs(1, getTimeBucket());
remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataIntegers(1, getDestServiceId());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setValue(remoteData.getDataLongs(0));
setTimeBucket(remoteData.getDataLongs(1));
setSourceServiceId(remoteData.getDataIntegers(0));
setDestServiceId(remoteData.getDataIntegers(1));
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("Service_Relation_Client_Calls_Sum", Scope.ServiceRelation, sourceServiceId, destServiceId);
}
public static class Builder implements StorageBuilder<ServiceRelationClientCallsSumIndicator> {
@Override public Map<String, Object> data2Map(ServiceRelationClientCallsSumIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put("source_service_id", storageData.getSourceServiceId());
map.put("dest_service_id", storageData.getDestServiceId());
map.put("value", storageData.getValue());
map.put("time_bucket", storageData.getTimeBucket());
return map;
}
@Override public ServiceRelationClientCallsSumIndicator map2Data(Map<String, Object> dbMap) {
ServiceRelationClientCallsSumIndicator indicator = new ServiceRelationClientCallsSumIndicator();
indicator.setSourceServiceId(((Number)dbMap.get("source_service_id")).intValue());
indicator.setDestServiceId(((Number)dbMap.get("dest_service_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).longValue());
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.indicator.expression.EqualMatch;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.*;
......@@ -30,9 +31,37 @@ import org.apache.skywalking.oap.server.core.source.*;
public class ServiceRelationDispatcher implements SourceDispatcher<ServiceRelation> {
@Override public void dispatch(ServiceRelation source) {
doServiceRelationClientCallsSum(source);
doServiceRelationServerCallsSum(source);
doServiceRelationAvg(source);
}
private void doServiceRelationClientCallsSum(ServiceRelation source) {
ServiceRelationClientCallsSumIndicator indicator = new ServiceRelationClientCallsSumIndicator();
if (!new EqualMatch().setLeft(source.getDetectPoint()).setRight(DetectPoint.CLIENT).match()) {
return;
}
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceServiceId(source.getSourceServiceId());
indicator.setDestServiceId(source.getDestServiceId());
indicator.combine(1);
IndicatorProcess.INSTANCE.in(indicator);
}
private void doServiceRelationServerCallsSum(ServiceRelation source) {
ServiceRelationServerCallsSumIndicator indicator = new ServiceRelationServerCallsSumIndicator();
if (!new EqualMatch().setLeft(source.getDetectPoint()).setRight(DetectPoint.SERVER).match()) {
return;
}
indicator.setTimeBucket(source.getTimeBucket());
indicator.setSourceServiceId(source.getSourceServiceId());
indicator.setDestServiceId(source.getDestServiceId());
indicator.combine(1);
IndicatorProcess.INSTANCE.in(indicator);
}
private void doServiceRelationAvg(ServiceRelation source) {
ServiceRelationAvgIndicator indicator = new ServiceRelationAvgIndicator();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.generated.servicerelation;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.SumIndicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* This class is auto generated. Please don't change this class manually.
*
* @author Observability Analysis Language code generator
*/
@IndicatorType
@StreamData
@StorageEntity(name = "service_relation_server_calls_sum", builder = ServiceRelationServerCallsSumIndicator.Builder.class)
public class ServiceRelationServerCallsSumIndicator extends SumIndicator implements AlarmSupported {
@Setter @Getter @Column(columnName = "source_service_id") private int sourceServiceId;
@Setter @Getter @Column(columnName = "dest_service_id") private int destServiceId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(sourceServiceId);
splitJointId += Const.ID_SPLIT + String.valueOf(destServiceId);
return splitJointId;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + sourceServiceId;
result = 31 * result + destServiceId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceRelationServerCallsSumIndicator indicator = (ServiceRelationServerCallsSumIndicator)obj;
if (sourceServiceId != indicator.sourceServiceId)
return false;
if (destServiceId != indicator.destServiceId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataLongs(0, getValue());
remoteBuilder.setDataLongs(1, getTimeBucket());
remoteBuilder.setDataIntegers(0, getSourceServiceId());
remoteBuilder.setDataIntegers(1, getDestServiceId());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setValue(remoteData.getDataLongs(0));
setTimeBucket(remoteData.getDataLongs(1));
setSourceServiceId(remoteData.getDataIntegers(0));
setDestServiceId(remoteData.getDataIntegers(1));
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("Service_Relation_Server_Calls_Sum", Scope.ServiceRelation, sourceServiceId, destServiceId);
}
public static class Builder implements StorageBuilder<ServiceRelationServerCallsSumIndicator> {
@Override public Map<String, Object> data2Map(ServiceRelationServerCallsSumIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put("source_service_id", storageData.getSourceServiceId());
map.put("dest_service_id", storageData.getDestServiceId());
map.put("value", storageData.getValue());
map.put("time_bucket", storageData.getTimeBucket());
return map;
}
@Override public ServiceRelationServerCallsSumIndicator map2Data(Map<String, Object> dbMap) {
ServiceRelationServerCallsSumIndicator indicator = new ServiceRelationServerCallsSumIndicator();
indicator.setSourceServiceId(((Number)dbMap.get("source_service_id")).intValue());
indicator.setDestServiceId(((Number)dbMap.get("dest_service_id")).intValue());
indicator.setValue(((Number)dbMap.get("value")).longValue());
indicator.setTimeBucket(((Number)dbMap.get("time_bucket")).longValue());
return indicator;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
@IndicatorOperator
public abstract class SumIndicator extends Indicator implements LongValueHolder {
protected static final String VALUE = "value";
@Getter @Setter @Column(columnName = VALUE) private long value;
@Entrance
public final void combine(@ConstOne long count) {
this.value += count;
}
@Override public final void combine(Indicator indicator) {
SumIndicator sumIndicator = (SumIndicator)indicator;
combine(sumIndicator.value);
}
@Override public void calculate() {
}
@Override public long getValue() {
return value;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.service;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.ServiceComponent;
/**
* @author peng-yongsheng
*/
public class ServiceComponentDispatcher implements SourceDispatcher<ServiceComponent> {
@Override public void dispatch(ServiceComponent source) {
doDispatch(source);
}
private void doDispatch(ServiceComponent source) {
ServiceComponentIndicator indicator = new ServiceComponentIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setServiceId(source.getServiceId());
indicator.setComponentId(source.getComponentId());
IndicatorProcess.INSTANCE.in(indicator);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.service;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* @author peng-yongsheng
*/
@IndicatorType
@StreamData
@StorageEntity(name = "service_component", builder = ServiceComponentIndicator.Builder.class)
public class ServiceComponentIndicator extends Indicator {
private static final String SERVICE_ID = "service_id";
private static final String COMPONENT_ID = "component_id";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = COMPONENT_ID) private int componentId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(serviceId);
splitJointId += Const.ID_SPLIT + String.valueOf(componentId);
return splitJointId;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + serviceId;
result = 31 * result + componentId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceComponentIndicator indicator = (ServiceComponentIndicator)obj;
if (serviceId != indicator.serviceId)
return false;
if (componentId != indicator.componentId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataLongs(0, getTimeBucket());
remoteBuilder.setDataIntegers(0, getServiceId());
remoteBuilder.setDataIntegers(1, getComponentId());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setTimeBucket(remoteData.getDataLongs(0));
setServiceId(remoteData.getDataIntegers(0));
setComponentId(remoteData.getDataIntegers(1));
}
@Override public void calculate() {
}
@Override public final void combine(Indicator indicator) {
}
public static class Builder implements StorageBuilder<ServiceComponentIndicator> {
@Override public Map<String, Object> data2Map(ServiceComponentIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, storageData.getServiceId());
map.put(COMPONENT_ID, storageData.getComponentId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
@Override public ServiceComponentIndicator map2Data(Map<String, Object> dbMap) {
ServiceComponentIndicator indicator = new ServiceComponentIndicator();
indicator.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
indicator.setComponentId(((Number)dbMap.get(COMPONENT_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.service;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
import org.apache.skywalking.oap.server.core.source.ServiceMapping;
/**
* @author peng-yongsheng
*/
public class ServiceMappingDispatcher implements SourceDispatcher<ServiceMapping> {
@Override public void dispatch(ServiceMapping source) {
doDispatch(source);
}
private void doDispatch(ServiceMapping source) {
ServiceMappingIndicator indicator = new ServiceMappingIndicator();
indicator.setTimeBucket(source.getTimeBucket());
indicator.setServiceId(source.getServiceId());
indicator.setMappingServiceId(source.getMappingServiceId());
IndicatorProcess.INSTANCE.in(indicator);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.manual.service;
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
import org.apache.skywalking.oap.server.core.storage.annotation.*;
/**
* @author peng-yongsheng
*/
@IndicatorType
@StreamData
@StorageEntity(name = "service_mapping", builder = ServiceMappingIndicator.Builder.class)
public class ServiceMappingIndicator extends Indicator {
private static final String SERVICE_ID = "service_id";
private static final String MAPPING_SERVICE_ID = "mapping_service_id";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int mappingServiceId;
@Override public String id() {
String splitJointId = String.valueOf(getTimeBucket());
splitJointId += Const.ID_SPLIT + String.valueOf(serviceId);
splitJointId += Const.ID_SPLIT + String.valueOf(mappingServiceId);
return splitJointId;
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + serviceId;
result = 31 * result + mappingServiceId;
result = 31 * result + (int)getTimeBucket();
return result;
}
@Override public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
ServiceMappingIndicator indicator = (ServiceMappingIndicator)obj;
if (serviceId != indicator.serviceId)
return false;
if (mappingServiceId != indicator.mappingServiceId)
return false;
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
}
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataLongs(0, getTimeBucket());
remoteBuilder.setDataIntegers(0, getServiceId());
remoteBuilder.setDataIntegers(1, getMappingServiceId());
return remoteBuilder;
}
@Override public void deserialize(RemoteData remoteData) {
setTimeBucket(remoteData.getDataLongs(0));
setServiceId(remoteData.getDataIntegers(0));
setMappingServiceId(remoteData.getDataIntegers(1));
}
@Override public void calculate() {
}
@Override public final void combine(Indicator indicator) {
}
public static class Builder implements StorageBuilder<ServiceMappingIndicator> {
@Override public Map<String, Object> data2Map(ServiceMappingIndicator storageData) {
Map<String, Object> map = new HashMap<>();
map.put(SERVICE_ID, storageData.getServiceId());
map.put(MAPPING_SERVICE_ID, storageData.getMappingServiceId());
map.put(TIME_BUCKET, storageData.getTimeBucket());
return map;
}
@Override public ServiceMappingIndicator map2Data(Map<String, Object> dbMap) {
ServiceMappingIndicator indicator = new ServiceMappingIndicator();
indicator.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
indicator.setMappingServiceId(((Number)dbMap.get(MAPPING_SERVICE_ID)).intValue());
indicator.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
return indicator;
}
}
}
......@@ -23,5 +23,6 @@ package org.apache.skywalking.oap.server.core.source;
*/
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
ServiceComponent, ServiceMapping
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.*;
import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
/**
* @author peng-yongsheng
*/
@SourceType
public class ServiceComponent extends Source {
@Override public Scope scope() {
return Scope.ServiceComponent;
}
@Getter @Setter private int serviceId;
@Getter @Setter private int componentId;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.source;
import lombok.*;
import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
/**
* @author peng-yongsheng
*/
@SourceType
public class ServiceMapping extends Source {
@Override public Scope scope() {
return Scope.ServiceMapping;
}
@Getter @Setter private int serviceId;
@Getter @Setter private int mappingServiceId;
}
Subproject commit 149251420c887552df93d5c76f67ccdd47cb71ce
Subproject commit c6ee5d15f38024c3be61be579eac4035ffbfd4bf
......@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
/**
* @author peng-yongsheng
......@@ -50,6 +51,8 @@ public class TraceModuleProvider extends ModuleProvider {
@Override public void start() throws ModuleStartException {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceComponentSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
try {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service;
import java.util.*;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
/**
* @author peng-yongsheng
*/
public class ServiceComponentSpanListener implements EntrySpanListener, ExitSpanListener {
private final SourceReceiver sourceReceiver;
private final ServiceInventoryCache serviceInventoryCache;
private final List<ServiceComponent> serviceComponents = new LinkedList<>();
private ServiceComponentSpanListener(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point);
}
@Override
public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
int serviceIdByPeerId = serviceInventoryCache.getServiceId(spanDecorator.getPeerId());
ServiceComponent serviceComponent = new ServiceComponent();
serviceComponent.setServiceId(serviceIdByPeerId);
serviceComponent.setComponentId(spanDecorator.getComponentId());
serviceComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
serviceComponents.add(serviceComponent);
}
@Override
public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
ServiceComponent serviceComponent = new ServiceComponent();
serviceComponent.setServiceId(segmentCoreInfo.getApplicationId());
serviceComponent.setComponentId(spanDecorator.getComponentId());
serviceComponent.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
serviceComponents.add(serviceComponent);
}
@Override public void build() {
serviceComponents.forEach(sourceReceiver::receive);
}
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager) {
return new ServiceComponentSpanListener(moduleManager);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service;
import java.util.*;
import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceMappingSpanListener implements EntrySpanListener {
private static final Logger logger = LoggerFactory.getLogger(ServiceMappingSpanListener.class);
private final SourceReceiver sourceReceiver;
private final ServiceInventoryCache serviceInventoryCache;
private List<ServiceMapping> serviceMappings = new LinkedList<>();
private ServiceMappingSpanListener(ModuleManager moduleManager) {
this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).getService(ServiceInventoryCache.class);
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener parse reference");
}
if (!spanDecorator.getSpanLayer().equals(SpanLayer.MQ)) {
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ServiceMapping serviceMapping = new ServiceMapping();
serviceMapping.setServiceId(segmentCoreInfo.getApplicationId());
int addressId = spanDecorator.getRefs(i).getNetworkAddressId();
int mappingServiceId = serviceInventoryCache.getServiceId(addressId);
serviceMapping.setMappingServiceId(mappingServiceId);
serviceMapping.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
serviceMappings.add(serviceMapping);
}
}
}
}
@Override public void build() {
if (logger.isDebugEnabled()) {
logger.debug("service mapping listener build");
}
serviceMappings.forEach(sourceReceiver::receive);
}
public static class Factory implements SpanListenerFactory {
@Override public SpanListener create(ModuleManager moduleManager) {
return new ServiceMappingSpanListener(moduleManager);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册