提交 0eb26f1d 编写于 作者: P peng-yongsheng

Service reference metric pyramid aggregate test successful.

上级 92bdb739
......@@ -59,7 +59,7 @@ public class ServiceMetricAlarmGraph {
.addNext(new ServiceMetricAlarmToListNodeProcessor())
.addNext(new ServiceMetricAlarmListPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
link(graph);
// link(graph);
}
private void link(Graph<ServiceMetric> graph) {
......
......@@ -59,7 +59,7 @@ public class ServiceReferenceMetricAlarmGraph {
.addNext(new ServiceReferenceMetricAlarmToListNodeProcessor())
.addNext(new ServiceReferenceMetricAlarmListPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
link(graph);
// link(graph);
}
private void link(Graph<ServiceReferenceMetric> graph) {
......
......@@ -36,6 +36,7 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segme
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentCostSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.metric.ServiceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
......@@ -83,7 +84,7 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
private void segmentParserListenerRegister() {
ISegmentParserListenerRegister segmentParserListenerRegister = getManager().find(AnalysisSegmentParserModule.NAME).getService(ISegmentParserListenerRegister.class);
// segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory());
segmentParserListenerRegister.register(new ServiceReferenceMetricSpanListener.Factory());
segmentParserListenerRegister.register(new ApplicationComponentSpanListener.Factory());
segmentParserListenerRegister.register(new ApplicationMappingSpanListener.Factory());
segmentParserListenerRegister.register(new InstanceMappingSpanListener.Factory());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
/**
* @author peng-yongsheng
*/
public class ApplicationComponentCopy {
public static ApplicationComponent copy(ApplicationComponent applicationComponent) {
ApplicationComponent newApplicationComponent = new ApplicationComponent();
newApplicationComponent.setId(applicationComponent.getId());
newApplicationComponent.setMetricId(applicationComponent.getMetricId());
newApplicationComponent.setComponentId(applicationComponent.getComponentId());
newApplicationComponent.setPeerId(applicationComponent.getPeerId());
newApplicationComponent.setTimeBucket(newApplicationComponent.getTimeBucket());
return newApplicationComponent;
}
}
......@@ -38,12 +38,8 @@ public class ApplicationComponentDayTransformNode implements NodeProcessor<Appli
public void process(ApplicationComponent applicationComponent, Next<ApplicationComponent> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(applicationComponent.getTimeBucket());
ApplicationComponent newApplicationComponent = new ApplicationComponent();
ApplicationComponent newApplicationComponent = ApplicationComponentCopy.copy(applicationComponent);
newApplicationComponent.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationComponent.getMetricId());
newApplicationComponent.setMetricId(applicationComponent.getMetricId());
newApplicationComponent.setComponentId(applicationComponent.getComponentId());
newApplicationComponent.setPeerId(applicationComponent.getPeerId());
newApplicationComponent.setTimeBucket(timeBucket);
next.execute(newApplicationComponent);
}
......
......@@ -38,12 +38,8 @@ public class ApplicationComponentHourTransformNode implements NodeProcessor<Appl
public void process(ApplicationComponent applicationComponent, Next<ApplicationComponent> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(applicationComponent.getTimeBucket());
ApplicationComponent newApplicationComponent = new ApplicationComponent();
ApplicationComponent newApplicationComponent = ApplicationComponentCopy.copy(applicationComponent);
newApplicationComponent.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationComponent.getMetricId());
newApplicationComponent.setMetricId(applicationComponent.getMetricId());
newApplicationComponent.setComponentId(applicationComponent.getComponentId());
newApplicationComponent.setPeerId(applicationComponent.getPeerId());
newApplicationComponent.setTimeBucket(timeBucket);
next.execute(newApplicationComponent);
}
......
......@@ -38,12 +38,8 @@ public class ApplicationComponentMonthTransformNode implements NodeProcessor<App
public void process(ApplicationComponent applicationComponent, Next<ApplicationComponent> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(applicationComponent.getTimeBucket());
ApplicationComponent newApplicationComponent = new ApplicationComponent();
ApplicationComponent newApplicationComponent = ApplicationComponentCopy.copy(applicationComponent);
newApplicationComponent.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationComponent.getMetricId());
newApplicationComponent.setMetricId(applicationComponent.getMetricId());
newApplicationComponent.setComponentId(applicationComponent.getComponentId());
newApplicationComponent.setPeerId(applicationComponent.getPeerId());
newApplicationComponent.setTimeBucket(timeBucket);
next.execute(newApplicationComponent);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
/**
* @author peng-yongsheng
*/
public class ApplicationMappingCopy {
public static ApplicationMapping copy(ApplicationMapping applicationMapping) {
ApplicationMapping newApplicationMapping = new ApplicationMapping();
newApplicationMapping.setId(applicationMapping.getId());
newApplicationMapping.setMetricId(applicationMapping.getMetricId());
newApplicationMapping.setTimeBucket(applicationMapping.getTimeBucket());
newApplicationMapping.setApplicationId(applicationMapping.getApplicationId());
newApplicationMapping.setAddressId(applicationMapping.getAddressId());
return newApplicationMapping;
}
}
......@@ -38,14 +38,9 @@ public class ApplicationMappingDayTransformNode implements NodeProcessor<Applica
public void process(ApplicationMapping applicationMapping, Next<ApplicationMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(applicationMapping.getTimeBucket());
ApplicationMapping newApplicationMapping = new ApplicationMapping();
ApplicationMapping newApplicationMapping = ApplicationMappingCopy.copy(applicationMapping);
newApplicationMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationMapping.getMetricId());
newApplicationMapping.setMetricId(applicationMapping.getMetricId());
newApplicationMapping.setTimeBucket(timeBucket);
newApplicationMapping.setApplicationId(applicationMapping.getApplicationId());
newApplicationMapping.setAddressId(applicationMapping.getAddressId());
next.execute(newApplicationMapping);
}
}
......@@ -38,13 +38,9 @@ public class ApplicationMappingHourTransformNode implements NodeProcessor<Applic
public void process(ApplicationMapping applicationMapping, Next<ApplicationMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(applicationMapping.getTimeBucket());
ApplicationMapping newApplicationMapping = new ApplicationMapping();
ApplicationMapping newApplicationMapping = ApplicationMappingCopy.copy(applicationMapping);
newApplicationMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationMapping.getMetricId());
newApplicationMapping.setMetricId(applicationMapping.getMetricId());
newApplicationMapping.setTimeBucket(timeBucket);
newApplicationMapping.setApplicationId(applicationMapping.getApplicationId());
newApplicationMapping.setAddressId(applicationMapping.getAddressId());
next.execute(newApplicationMapping);
}
}
......@@ -38,14 +38,9 @@ public class ApplicationMappingMonthTransformNode implements NodeProcessor<Appli
public void process(ApplicationMapping applicationMapping, Next<ApplicationMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(applicationMapping.getTimeBucket());
ApplicationMapping newApplicationMapping = new ApplicationMapping();
ApplicationMapping newApplicationMapping = ApplicationMappingCopy.copy(applicationMapping);
newApplicationMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + applicationMapping.getMetricId());
newApplicationMapping.setMetricId(applicationMapping.getMetricId());
newApplicationMapping.setTimeBucket(timeBucket);
newApplicationMapping.setApplicationId(applicationMapping.getApplicationId());
newApplicationMapping.setAddressId(applicationMapping.getAddressId());
next.execute(newApplicationMapping);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
/**
* @author peng-yongsheng
*/
public class InstanceMappingCopy {
public static InstanceMapping copy(InstanceMapping instanceMapping) {
InstanceMapping newInstanceMapping = new InstanceMapping();
newInstanceMapping.setId(instanceMapping.getId());
newInstanceMapping.setMetricId(instanceMapping.getMetricId());
newInstanceMapping.setApplicationId(instanceMapping.getApplicationId());
newInstanceMapping.setInstanceId(instanceMapping.getInstanceId());
newInstanceMapping.setAddressId(instanceMapping.getAddressId());
newInstanceMapping.setTimeBucket(instanceMapping.getTimeBucket());
return newInstanceMapping;
}
}
......@@ -37,15 +37,9 @@ public class InstanceMappingDayTransformNode implements NodeProcessor<InstanceMa
@Override public void process(InstanceMapping instanceMapping, Next<InstanceMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(instanceMapping.getTimeBucket());
InstanceMapping newInstanceMapping = new InstanceMapping();
InstanceMapping newInstanceMapping = InstanceMappingCopy.copy(instanceMapping);
newInstanceMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMapping.getMetricId());
newInstanceMapping.setMetricId(instanceMapping.getMetricId());
newInstanceMapping.setApplicationId(instanceMapping.getApplicationId());
newInstanceMapping.setInstanceId(instanceMapping.getInstanceId());
newInstanceMapping.setAddressId(instanceMapping.getAddressId());
newInstanceMapping.setTimeBucket(timeBucket);
next.execute(newInstanceMapping);
}
}
......@@ -37,15 +37,9 @@ public class InstanceMappingHourTransformNode implements NodeProcessor<InstanceM
@Override public void process(InstanceMapping instanceMapping, Next<InstanceMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(instanceMapping.getTimeBucket());
InstanceMapping newInstanceMapping = new InstanceMapping();
InstanceMapping newInstanceMapping = InstanceMappingCopy.copy(instanceMapping);
newInstanceMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMapping.getMetricId());
newInstanceMapping.setMetricId(instanceMapping.getMetricId());
newInstanceMapping.setApplicationId(instanceMapping.getApplicationId());
newInstanceMapping.setInstanceId(instanceMapping.getInstanceId());
newInstanceMapping.setAddressId(instanceMapping.getAddressId());
newInstanceMapping.setTimeBucket(timeBucket);
next.execute(newInstanceMapping);
}
}
......@@ -37,15 +37,9 @@ public class InstanceMappingMonthTransformNode implements NodeProcessor<Instance
@Override public void process(InstanceMapping instanceMapping, Next<InstanceMapping> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(instanceMapping.getTimeBucket());
InstanceMapping newInstanceMapping = new InstanceMapping();
InstanceMapping newInstanceMapping = InstanceMappingCopy.copy(instanceMapping);
newInstanceMapping.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMapping.getMetricId());
newInstanceMapping.setMetricId(instanceMapping.getMetricId());
newInstanceMapping.setApplicationId(instanceMapping.getApplicationId());
newInstanceMapping.setInstanceId(instanceMapping.getInstanceId());
newInstanceMapping.setAddressId(instanceMapping.getAddressId());
newInstanceMapping.setTimeBucket(timeBucket);
next.execute(newInstanceMapping);
}
}
......@@ -64,7 +64,7 @@ public class InstanceReferenceMetricGraph {
remoteNode.addNext(new InstanceReferenceMonthMetricTransformNode())
.addNext(new InstanceReferenceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
link(graph);
// link(graph);
}
private void link(Graph<ServiceReferenceMetric> graph) {
......
......@@ -64,7 +64,7 @@ public class ServiceMetricGraph {
remoteNode.addNext(new ServiceMonthMetricTransformNode())
.addNext(new ServiceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
link(graph);
// link(graph);
}
private void link(Graph<ServiceReferenceMetric> graph) {
......
......@@ -36,9 +36,11 @@ public class ServiceReferenceDayMetricTransformNode implements NodeProcessor<Ser
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
next.execute(serviceReferenceMetric);
ServiceReferenceMetric newServiceReferenceMetric = ServiceReferenceMetricCopy.copy(serviceReferenceMetric);
newServiceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
newServiceReferenceMetric.setTimeBucket(timeBucket);
next.execute(newServiceReferenceMetric);
}
}
......@@ -36,9 +36,11 @@ public class ServiceReferenceHourMetricTransformNode implements NodeProcessor<Se
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
next.execute(serviceReferenceMetric);
ServiceReferenceMetric newServiceReferenceMetric = ServiceReferenceMetricCopy.copy(serviceReferenceMetric);
newServiceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
newServiceReferenceMetric.setTimeBucket(timeBucket);
next.execute(newServiceReferenceMetric);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricCopy {
public static ServiceReferenceMetric copy(ServiceReferenceMetric serviceReferenceMetric) {
ServiceReferenceMetric newServiceReferenceMetric = new ServiceReferenceMetric();
newServiceReferenceMetric.setId(serviceReferenceMetric.getId());
newServiceReferenceMetric.setMetricId(serviceReferenceMetric.getMetricId());
newServiceReferenceMetric.setSourceValue(serviceReferenceMetric.getSourceValue());
newServiceReferenceMetric.setFrontApplicationId(serviceReferenceMetric.getFrontApplicationId());
newServiceReferenceMetric.setFrontInstanceId(serviceReferenceMetric.getFrontInstanceId());
newServiceReferenceMetric.setFrontServiceId(serviceReferenceMetric.getFrontServiceId());
newServiceReferenceMetric.setBehindApplicationId(serviceReferenceMetric.getBehindApplicationId());
newServiceReferenceMetric.setBehindInstanceId(serviceReferenceMetric.getBehindInstanceId());
newServiceReferenceMetric.setBehindServiceId(serviceReferenceMetric.getBehindServiceId());
newServiceReferenceMetric.setTransactionCalls(serviceReferenceMetric.getTransactionCalls());
newServiceReferenceMetric.setTransactionDurationSum(serviceReferenceMetric.getTransactionDurationSum());
newServiceReferenceMetric.setTransactionErrorCalls(serviceReferenceMetric.getTransactionErrorCalls());
newServiceReferenceMetric.setTransactionErrorDurationSum(serviceReferenceMetric.getTransactionErrorDurationSum());
newServiceReferenceMetric.setBusinessTransactionCalls(serviceReferenceMetric.getBusinessTransactionCalls());
newServiceReferenceMetric.setBusinessTransactionDurationSum(serviceReferenceMetric.getBusinessTransactionDurationSum());
newServiceReferenceMetric.setBusinessTransactionErrorCalls(serviceReferenceMetric.getBusinessTransactionErrorCalls());
newServiceReferenceMetric.setBusinessTransactionErrorDurationSum(serviceReferenceMetric.getBusinessTransactionErrorDurationSum());
newServiceReferenceMetric.setMqTransactionCalls(serviceReferenceMetric.getMqTransactionCalls());
newServiceReferenceMetric.setMqTransactionDurationSum(serviceReferenceMetric.getMqTransactionDurationSum());
newServiceReferenceMetric.setMqTransactionErrorCalls(serviceReferenceMetric.getMqTransactionErrorCalls());
newServiceReferenceMetric.setMqTransactionErrorDurationSum(serviceReferenceMetric.getMqTransactionErrorDurationSum());
newServiceReferenceMetric.setTimeBucket(serviceReferenceMetric.getTimeBucket());
return newServiceReferenceMetric;
}
}
......@@ -44,8 +44,8 @@ public class ServiceReferenceMetricGraph {
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Node<ServiceReferenceMetric, ServiceReferenceMetric> remoteNode = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
.addNode(new ServiceReferenceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener));
.addNode(new ServiceReferenceMinuteMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new ServiceReferenceMinuteMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener));
remoteNode.addNext(new ServiceReferenceMinuteMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
......
......@@ -152,9 +152,11 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
logger.debug("service reference listener build");
Graph<ServiceReferenceMetric> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class);
entryReferenceMetric.forEach(serviceReferenceMetric -> {
String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String id = timeBucket + Const.ID_SPLIT + metricId;
serviceReferenceMetric.setId(id);
serviceReferenceMetric.setMetricId(metricId);
serviceReferenceMetric.setTimeBucket(timeBucket);
logger.debug("push to service reference aggregation worker, id: {}", serviceReferenceMetric.getId());
......@@ -168,8 +170,10 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID);
}
String id = timeBucket + Const.ID_SPLIT + serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String metricId = serviceReferenceMetric.getFrontServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getBehindServiceId() + Const.ID_SPLIT + serviceReferenceMetric.getSourceValue();
String id = timeBucket + Const.ID_SPLIT + metricId;
serviceReferenceMetric.setId(id);
serviceReferenceMetric.setMetricId(metricId);
serviceReferenceMetric.setTimeBucket(timeBucket);
graph.start(serviceReferenceMetric);
......
......@@ -27,9 +27,9 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricAggregationWorker extends AggregationWorker<ServiceReferenceMetric, ServiceReferenceMetric> {
public class ServiceReferenceMinuteMetricAggregationWorker extends AggregationWorker<ServiceReferenceMetric, ServiceReferenceMetric> {
public ServiceReferenceMetricAggregationWorker(ModuleManager moduleManager) {
public ServiceReferenceMinuteMetricAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -37,14 +37,14 @@ public class ServiceReferenceMetricAggregationWorker extends AggregationWorker<S
return MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMetricAggregationWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMinuteMetricAggregationWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceReferenceMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMetricAggregationWorker(moduleManager);
@Override public ServiceReferenceMinuteMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMinuteMetricAggregationWorker(moduleManager);
}
@Override
......
......@@ -30,9 +30,9 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricRemoteWorker extends AbstractRemoteWorker<ServiceReferenceMetric, ServiceReferenceMetric> {
public class ServiceReferenceMinuteMetricRemoteWorker extends AbstractRemoteWorker<ServiceReferenceMetric, ServiceReferenceMetric> {
public ServiceReferenceMetricRemoteWorker(ModuleManager moduleManager) {
public ServiceReferenceMinuteMetricRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -48,14 +48,14 @@ public class ServiceReferenceMetricRemoteWorker extends AbstractRemoteWorker<Ser
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMetricRemoteWorker> {
public static class Factory extends AbstractRemoteWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMinuteMetricRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public ServiceReferenceMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMetricRemoteWorker(moduleManager);
@Override public ServiceReferenceMinuteMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMinuteMetricRemoteWorker(moduleManager);
}
}
}
......@@ -36,9 +36,11 @@ public class ServiceReferenceMonthMetricTransformNode implements NodeProcessor<S
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
next.execute(serviceReferenceMetric);
ServiceReferenceMetric newServiceReferenceMetric = ServiceReferenceMetricCopy.copy(serviceReferenceMetric);
newServiceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
newServiceReferenceMetric.setTimeBucket(timeBucket);
next.execute(newServiceReferenceMetric);
}
}
......@@ -80,7 +80,10 @@ import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegiste
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
/**
* @author peng-yongsheng
......@@ -146,8 +149,16 @@ public class StorageModule extends Module {
classes.add(IApplicationReferenceMinuteMetricPersistenceDAO.class);
classes.add(ISegmentCostPersistenceDAO.class);
classes.add(ISegmentPersistenceDAO.class);
classes.add(IServiceMinuteMetricPersistenceDAO.class);
// classes.add(IServiceHourMetricPersistenceDAO.class);
// classes.add(IServiceDayMetricPersistenceDAO.class);
// classes.add(IServiceMonthMetricPersistenceDAO.class);
classes.add(IServiceReferenceMinuteMetricPersistenceDAO.class);
classes.add(IServiceReferenceHourMetricPersistenceDAO.class);
classes.add(IServiceReferenceDayMetricPersistenceDAO.class);
classes.add(IServiceReferenceMonthMetricPersistenceDAO.class);
classes.add(IInstanceMinuteMetricPersistenceDAO.class);
classes.add(IInstanceReferenceMinuteMetricPersistenceDAO.class);
......
......@@ -89,7 +89,10 @@ import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegiste
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
import org.apache.skywalking.apm.collector.storage.es.dao.ApplicationAlarmEsPersistenceDAO;
......@@ -150,7 +153,10 @@ import org.apache.skywalking.apm.collector.storage.es.dao.register.InstanceEsReg
import org.apache.skywalking.apm.collector.storage.es.dao.register.NetworkAddressRegisterEsDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.register.ServiceNameEsRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.smp.ServiceMinuteMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceDayMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMinuteMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMonthMetricEsPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -263,11 +269,17 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(IGlobalTracePersistenceDAO.class, new GlobalTraceEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IApplicationMinuteMetricPersistenceDAO.class, new ApplicationMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IApplicationReferenceMinuteMetricPersistenceDAO.class, new ApplicationReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceMinuteMetricPersistenceDAO.class, new ServiceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceHourMetricPersistenceDAO.class, new ServiceReferenceHourMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceDayMetricPersistenceDAO.class, new ServiceReferenceDayMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMonthMetricPersistenceDAO.class, new ServiceReferenceMonthMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceMinuteMetricPersistenceDAO.class, new InstanceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient));
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.storage.es.define;
package org.apache.skywalking.apm.collector.storage.es.define.srmp;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine;
......@@ -25,17 +25,15 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefine {
public abstract class AbstractServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefine {
public ServiceReferenceMetricEsTableDefine() {
super(ServiceReferenceMetricTable.TABLE);
public AbstractServiceReferenceMetricEsTableDefine(String name) {
super(name);
}
@Override public int refreshInterval() {
return 2;
}
@Override public final void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_METRIC_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, ElasticSearchColumnDefine.Type.Integer.name()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.srmp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceDayMetricEsTableDefine extends AbstractServiceReferenceMetricEsTableDefine {
public ServiceReferenceDayMetricEsTableDefine() {
super(ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Day.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.srmp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceHourMetricEsTableDefine extends AbstractServiceReferenceMetricEsTableDefine {
public ServiceReferenceHourMetricEsTableDefine() {
super(ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Hour.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.srmp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMinuteMetricEsTableDefine extends AbstractServiceReferenceMetricEsTableDefine {
public ServiceReferenceMinuteMetricEsTableDefine() {
super(ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Minute.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.srmp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMonthMetricEsTableDefine extends AbstractServiceReferenceMetricEsTableDefine {
public ServiceReferenceMonthMetricEsTableDefine() {
super(ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Month.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
......@@ -18,6 +18,11 @@ org.apache.skywalking.apm.collector.storage.es.define.instmapping.InstanceMappin
org.apache.skywalking.apm.collector.storage.es.define.instmapping.InstanceMappingDayEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.instmapping.InstanceMappingMonthEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.srmp.ServiceReferenceMinuteMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.srmp.ServiceReferenceHourMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.srmp.ServiceReferenceDayMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.srmp.ServiceReferenceMonthMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.GlobalTraceEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.SegmentEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.SegmentCostEsTableDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册