提交 7e84aedd 编写于 作者: P peng-yongsheng

Instance metric pyramid aggregate test successful.

上级 8745cb2b
......@@ -36,9 +36,10 @@ public class InstanceDayMetricTransformNode implements NodeProcessor<InstanceMet
@Override public void process(InstanceMetric instanceMetric, Next<InstanceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(instanceMetric.getTimeBucket());
instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
instanceMetric.setTimeBucket(timeBucket);
next.execute(instanceMetric);
InstanceMetric newInstanceMetric = InstanceMetricCopy.copy(instanceMetric);
newInstanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
newInstanceMetric.setTimeBucket(timeBucket);
next.execute(newInstanceMetric);
}
}
......@@ -36,9 +36,10 @@ public class InstanceHourMetricTransformNode implements NodeProcessor<InstanceMe
@Override public void process(InstanceMetric instanceMetric, Next<InstanceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(instanceMetric.getTimeBucket());
instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
instanceMetric.setTimeBucket(timeBucket);
next.execute(instanceMetric);
InstanceMetric newInstanceMetric = InstanceMetricCopy.copy(instanceMetric);
newInstanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
newInstanceMetric.setTimeBucket(timeBucket);
next.execute(newInstanceMetric);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
/**
* @author peng-yongsheng
*/
public class InstanceMetricCopy {
public static InstanceMetric copy(InstanceMetric instanceMetric) {
InstanceMetric newInstanceMetric = new InstanceMetric();
newInstanceMetric.setId(instanceMetric.getId());
newInstanceMetric.setMetricId(instanceMetric.getMetricId());
newInstanceMetric.setSourceValue(instanceMetric.getSourceValue());
newInstanceMetric.setApplicationId(instanceMetric.getApplicationId());
newInstanceMetric.setInstanceId(instanceMetric.getInstanceId());
newInstanceMetric.setTransactionCalls(instanceMetric.getTransactionCalls());
newInstanceMetric.setTransactionDurationSum(instanceMetric.getTransactionDurationSum());
newInstanceMetric.setTransactionErrorCalls(instanceMetric.getTransactionErrorCalls());
newInstanceMetric.setTransactionErrorDurationSum(instanceMetric.getTransactionErrorDurationSum());
newInstanceMetric.setBusinessTransactionCalls(instanceMetric.getBusinessTransactionCalls());
newInstanceMetric.setBusinessTransactionDurationSum(instanceMetric.getBusinessTransactionDurationSum());
newInstanceMetric.setBusinessTransactionErrorCalls(instanceMetric.getBusinessTransactionErrorCalls());
newInstanceMetric.setBusinessTransactionErrorDurationSum(instanceMetric.getBusinessTransactionErrorDurationSum());
newInstanceMetric.setMqTransactionCalls(instanceMetric.getMqTransactionCalls());
newInstanceMetric.setMqTransactionDurationSum(instanceMetric.getMqTransactionDurationSum());
newInstanceMetric.setMqTransactionErrorCalls(instanceMetric.getMqTransactionErrorCalls());
newInstanceMetric.setMqTransactionErrorDurationSum(instanceMetric.getMqTransactionErrorDurationSum());
newInstanceMetric.setTimeBucket(instanceMetric.getTimeBucket());
return newInstanceMetric;
}
}
......@@ -50,8 +50,8 @@ public class InstanceMetricGraph {
Graph<InstanceReferenceMetric> graph = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID, InstanceReferenceMetric.class);
Node<InstanceMetric, InstanceMetric> remoteNode = graph.addNode(new InstanceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new InstanceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID).create(workerCreateListener));
Node<InstanceMetric, InstanceMetric> remoteNode = graph.addNode(new InstanceMinuteMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new InstanceMinuteMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.INSTANCE_METRIC_GRAPH_ID).create(workerCreateListener));
remoteNode.addNext(new InstanceMinuteMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
......@@ -64,7 +64,7 @@ public class InstanceMetricGraph {
remoteNode.addNext(new InstanceMonthMetricTransformNode())
.addNext(new InstanceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
// link(graph);
link(graph);
}
private void link(Graph<InstanceReferenceMetric> graph) {
......
......@@ -29,9 +29,9 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceRefere
/**
* @author peng-yongsheng
*/
public class InstanceMetricAggregationWorker extends AggregationWorker<InstanceReferenceMetric, InstanceMetric> {
public class InstanceMinuteMetricAggregationWorker extends AggregationWorker<InstanceReferenceMetric, InstanceMetric> {
public InstanceMetricAggregationWorker(ModuleManager moduleManager) {
public InstanceMinuteMetricAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -70,14 +70,14 @@ public class InstanceMetricAggregationWorker extends AggregationWorker<InstanceR
return instanceMetric;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceReferenceMetric, InstanceMetric, InstanceMetricAggregationWorker> {
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceReferenceMetric, InstanceMetric, InstanceMinuteMetricAggregationWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public InstanceMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMetricAggregationWorker(moduleManager);
@Override public InstanceMinuteMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMinuteMetricAggregationWorker(moduleManager);
}
@Override
......
......@@ -30,9 +30,9 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric
/**
* @author peng-yongsheng
*/
public class InstanceMetricRemoteWorker extends AbstractRemoteWorker<InstanceMetric, InstanceMetric> {
public class InstanceMinuteMetricRemoteWorker extends AbstractRemoteWorker<InstanceMetric, InstanceMetric> {
public InstanceMetricRemoteWorker(ModuleManager moduleManager) {
public InstanceMinuteMetricRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -48,14 +48,14 @@ public class InstanceMetricRemoteWorker extends AbstractRemoteWorker<InstanceMet
onNext(instanceMetric);
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceMetric, InstanceMetric, InstanceMetricRemoteWorker> {
public static class Factory extends AbstractRemoteWorkerProvider<InstanceMetric, InstanceMetric, InstanceMinuteMetricRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public InstanceMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMetricRemoteWorker(moduleManager);
@Override public InstanceMinuteMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new InstanceMinuteMetricRemoteWorker(moduleManager);
}
}
}
......@@ -36,9 +36,10 @@ public class InstanceMonthMetricTransformNode implements NodeProcessor<InstanceM
@Override public void process(InstanceMetric instanceMetric, Next<InstanceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(instanceMetric.getTimeBucket());
instanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
instanceMetric.setTimeBucket(timeBucket);
next.execute(instanceMetric);
InstanceMetric newInstanceMetric = InstanceMetricCopy.copy(instanceMetric);
newInstanceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + instanceMetric.getMetricId());
newInstanceMetric.setTimeBucket(timeBucket);
next.execute(newInstanceMetric);
}
}
......@@ -70,7 +70,10 @@ import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenc
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpump.ICpuSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gcmp.IGCSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMinutePersistenceDAO;
......@@ -175,6 +178,9 @@ public class StorageModule extends Module {
classes.add(IServiceReferenceMonthMetricPersistenceDAO.class);
classes.add(IInstanceMinuteMetricPersistenceDAO.class);
classes.add(IInstanceHourMetricPersistenceDAO.class);
classes.add(IInstanceDayMetricPersistenceDAO.class);
classes.add(IInstanceMonthMetricPersistenceDAO.class);
classes.add(IInstanceReferenceMinuteMetricPersistenceDAO.class);
classes.add(IInstanceReferenceHourMetricPersistenceDAO.class);
......
......@@ -61,6 +61,7 @@ public class InstanceMetric extends StreamData implements Metric {
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public InstanceMetric() {
......
......@@ -79,7 +79,10 @@ import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenc
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpump.ICpuSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gcmp.IGCSecondMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMinutePersistenceDAO;
......@@ -152,7 +155,10 @@ import org.apache.skywalking.apm.collector.storage.es.dao.armp.ApplicationRefere
import org.apache.skywalking.apm.collector.storage.es.dao.armp.ApplicationReferenceMonthMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.cpump.CpuSecondMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.gcmp.GCSecondMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceDayMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceHourMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceMinuteMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.imp.InstanceMonthMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.impp.InstanceMappingDayEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.impp.InstanceMappingHourEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.impp.InstanceMappingMinuteEsPersistenceDAO;
......@@ -307,6 +313,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(IServiceReferenceMonthMetricPersistenceDAO.class, new ServiceReferenceMonthMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceMinuteMetricPersistenceDAO.class, new InstanceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceHourMetricPersistenceDAO.class, new InstanceHourMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceDayMetricPersistenceDAO.class, new InstanceDayMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceMonthMetricPersistenceDAO.class, new InstanceMonthMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceReferenceMinuteMetricPersistenceDAO.class, new InstanceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceReferenceHourMetricPersistenceDAO.class, new InstanceReferenceHourMetricEsPersistenceDAO(elasticSearchClient));
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.storage.es.define;
package org.apache.skywalking.apm.collector.storage.es.define.imp;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine;
......@@ -25,19 +25,18 @@ import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric
/**
* @author peng-yongsheng
*/
public class InstanceMetricEsTableDefine extends ElasticSearchTableDefine {
public abstract class AbstractInstanceMetricEsTableDefine extends ElasticSearchTableDefine {
public InstanceMetricEsTableDefine() {
super(InstanceMetricTable.TABLE);
public AbstractInstanceMetricEsTableDefine(String name) {
super(name);
}
@Override public int refreshInterval() {
return 2;
}
@Override public void initialize() {
@Override public final void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_METRIC_ID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_SOURCE_VALUE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_TRANSACTION_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.imp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
/**
* @author peng-yongsheng
*/
public class InstanceDayMetricEsTableDefine extends AbstractInstanceMetricEsTableDefine {
public InstanceDayMetricEsTableDefine() {
super(InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Day.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.imp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
/**
* @author peng-yongsheng
*/
public class InstanceHourMetricEsTableDefine extends AbstractInstanceMetricEsTableDefine {
public InstanceHourMetricEsTableDefine() {
super(InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Hour.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.imp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
/**
* @author peng-yongsheng
*/
public class InstanceMinuteMetricEsTableDefine extends AbstractInstanceMetricEsTableDefine {
public InstanceMinuteMetricEsTableDefine() {
super(InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Minute.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.define.imp;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetricTable;
/**
* @author peng-yongsheng
*/
public class InstanceMonthMetricEsTableDefine extends AbstractInstanceMetricEsTableDefine {
public InstanceMonthMetricEsTableDefine() {
super(InstanceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Month.getName());
}
@Override public int refreshInterval() {
return 2;
}
}
......@@ -33,6 +33,11 @@ org.apache.skywalking.apm.collector.storage.es.define.irmp.InstanceReferenceHour
org.apache.skywalking.apm.collector.storage.es.define.irmp.InstanceReferenceDayMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.irmp.InstanceReferenceMonthMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.imp.InstanceMinuteMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.imp.InstanceHourMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.imp.InstanceDayMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.imp.InstanceMonthMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.armp.ApplicationReferenceMinuteMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.armp.ApplicationReferenceHourMetricEsTableDefine
org.apache.skywalking.apm.collector.storage.es.define.armp.ApplicationReferenceDayMetricEsTableDefine
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册