提交 78f93492 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Aggregator and Indicator define (#1498)

* Sample operator code.

* Indicator aggregator framework.

* Provide some annotation for OAL.
上级 58b83e86
......@@ -22,12 +22,13 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public abstract class AbstractAggregator<INPUT extends StreamData> {
public abstract class AbstractAggregator<INPUT extends Indicator> {
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
......@@ -81,7 +82,7 @@ public abstract class AbstractAggregator<INPUT extends StreamData> {
private void aggregate(INPUT message) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(message)) {
// mergeDataCache.get(message).mergeAndFormulaCalculateData(message);
mergeDataCache.get(message).combine(message);
} else {
mergeDataCache.put(message);
}
......
......@@ -18,26 +18,28 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
public class MergeDataCache<STREAM_DATA extends StreamData> extends Window<MergeDataCollection<STREAM_DATA>> implements DataCache {
public class MergeDataCache<INDICATOR extends Indicator> extends Window<MergeDataCollection<INDICATOR>> implements DataCache {
private MergeDataCollection<STREAM_DATA> lockedMergeDataCollection;
private MergeDataCollection<INDICATOR> lockedMergeDataCollection;
@Override public MergeDataCollection<STREAM_DATA> collectionInstance() {
@Override public MergeDataCollection<INDICATOR> collectionInstance() {
return new MergeDataCollection<>();
}
public boolean containsKey(STREAM_DATA key) {
public boolean containsKey(INDICATOR key) {
return lockedMergeDataCollection.containsKey(key);
}
public StreamData get(STREAM_DATA key) {
public Indicator get(INDICATOR key) {
return lockedMergeDataCollection.get(key);
}
public void put(STREAM_DATA data) {
public void put(INDICATOR data) {
lockedMergeDataCollection.put(data);
}
......
......@@ -26,10 +26,10 @@ import org.apache.skywalking.oap.server.core.receiver.Endpoint;
*/
public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
private final EndpointAvgAggregator avgAggregator;
private final EndpointLatencyAvgAggregator avgAggregator;
public EndpointDispatcher() {
this.avgAggregator = new EndpointAvgAggregator();
this.avgAggregator = new EndpointLatencyAvgAggregator();
}
@Override public void dispatch(Endpoint source) {
......@@ -37,9 +37,8 @@ public class EndpointDispatcher implements SourceDispatcher<Endpoint> {
}
private void avg(Endpoint source) {
EndpointAvgIndicate indicate = new EndpointAvgIndicate(source.getId(), source.getTimeBucket());
indicate.setLatency(source.getLatency());
avgAggregator.in(indicate);
EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
indicator.combine(source.getLatency(), 1);
avgAggregator.in(indicator);
}
}
......@@ -24,11 +24,11 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class EndpointAvgAggregator extends AbstractAggregator<EndpointAvgIndicate> {
public class EndpointLatencyAvgAggregator extends AbstractAggregator<EndpointLatencyAvgIndicator> {
private static final Logger logger = LoggerFactory.getLogger(EndpointAvgAggregator.class);
private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
@Override protected void onNext(EndpointAvgIndicate data) {
@Override protected void onNext(EndpointLatencyAvgIndicator data) {
}
}
......@@ -18,34 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.AvgIndicate;
import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
/**
* @author peng-yongsheng
*/
public class EndpointAvgIndicate extends AvgIndicate {
public class EndpointLatencyAvgIndicator extends AvgIndicator {
@Getter private final int id;
private final int id;
public EndpointAvgIndicate(int id, long timeBucket) {
public EndpointLatencyAvgIndicator(long timeBucket, int id) {
super(timeBucket);
this.id = id;
}
public void setLatency(long latency) {
setValue(latency);
}
public long getLatency() {
return getValue();
}
@Override public int hashCode() {
int result = 17;
result = 31 * result + id;
//TODO How?
// result = 31 * result + getTimeBucket();
result = 31 * result + (int)getTimeBucket();
return result;
}
......@@ -57,10 +47,10 @@ public class EndpointAvgIndicate extends AvgIndicate {
if (getClass() != obj.getClass())
return false;
EndpointAvgIndicate indicate = (EndpointAvgIndicate)obj;
if (id != indicate.id)
EndpointLatencyAvgIndicator indicator = (EndpointLatencyAvgIndicator)obj;
if (id != indicator.id)
return false;
if (getTimeBucket() != indicate.getTimeBucket())
if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
/**
* @author peng-yongsheng
*/
@IndicatorType
public abstract class AvgIndicator extends Indicator {
private long summation;
private int count;
public AvgIndicator(long timeBucket) {
super(timeBucket);
}
@Entrance
public void combine(@SourceFrom long summation, @ConstOne int count) {
this.summation += summation;
this.count += count;
}
@Override public void combine(Indicator indicator) {
AvgIndicator avgIndicator = (AvgIndicator)indicator;
combine(avgIndicator.summation, avgIndicator.count);
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis;
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
......@@ -24,11 +24,13 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
/**
* @author peng-yongsheng
*/
public abstract class Indicate extends StreamData {
public abstract class Indicator extends StreamData {
@Getter private final long timeBucket;
public Indicate(long timeBucket) {
public Indicator(long timeBucket) {
this.timeBucket = timeBucket;
}
public abstract void combine(Indicator indicator);
}
......@@ -16,24 +16,14 @@
*
*/
package org.apache.skywalking.oap.server.core.analysis;
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import lombok.*;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
public abstract class AvgIndicate extends Indicate {
@Setter @Getter private long times;
@Setter @Getter private long value;
public AvgIndicate(long timeBucket) {
super(timeBucket);
this.times = 1;
}
public long getAvg() {
return value / times;
}
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.SOURCE)
public @interface ConstOne {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.SOURCE)
public @interface Entrance {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.SOURCE)
public @interface IndicatorType {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.SOURCE)
public @interface SourceFrom {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册