From 78f93492f16d594599e60358a7062f1debd27ca0 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= <8082209@qq.com>
Date: Thu, 26 Jul 2018 16:24:01 +0800
Subject: [PATCH] Aggregator and Indicator define (#1498)
* Sample operator code.
* Indicator aggregator framework.
* Provide some annotation for OAL.
---
.../core/analysis/AbstractAggregator.java | 5 +-
.../core/analysis/data/MergeDataCache.java | 14 +++---
.../analysis/endpoint/EndpointDispatcher.java | 11 ++---
...java => EndpointLatencyAvgAggregator.java} | 6 +--
....java => EndpointLatencyAvgIndicator.java} | 26 ++++-------
.../core/analysis/indicator/AvgIndicator.java | 46 +++++++++++++++++++
.../Indicator.java} | 8 ++--
.../annotation/ConstOne.java} | 20 ++------
.../indicator/annotation/Entrance.java | 29 ++++++++++++
.../indicator/annotation/IndicatorType.java | 29 ++++++++++++
.../indicator/annotation/SourceFrom.java | 29 ++++++++++++
11 files changed, 170 insertions(+), 53 deletions(-)
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/{EndpointAvgAggregator.java => EndpointLatencyAvgAggregator.java} (83%)
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/{EndpointAvgIndicate.java => EndpointLatencyAvgIndicator.java} (69%)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/{Indicate.java => indicator/Indicator.java} (82%)
rename oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/{AvgIndicate.java => indicator/annotation/ConstOne.java} (69%)
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/Entrance.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
create mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/SourceFrom.java
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
index ea8a692cef..f4e769ade6 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AbstractAggregator.java
@@ -22,12 +22,13 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public abstract class AbstractAggregator {
+public abstract class AbstractAggregator {
private static final Logger logger = LoggerFactory.getLogger(AbstractAggregator.class);
@@ -81,7 +82,7 @@ public abstract class AbstractAggregator {
private void aggregate(INPUT message) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(message)) {
-// mergeDataCache.get(message).mergeAndFormulaCalculateData(message);
+ mergeDataCache.get(message).combine(message);
} else {
mergeDataCache.put(message);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
index 8ef0354eed..b5de443613 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
@@ -18,26 +18,28 @@
package org.apache.skywalking.oap.server.core.analysis.data;
+import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+
/**
* @author peng-yongsheng
*/
-public class MergeDataCache extends Window> implements DataCache {
+public class MergeDataCache extends Window> implements DataCache {
- private MergeDataCollection lockedMergeDataCollection;
+ private MergeDataCollection lockedMergeDataCollection;
- @Override public MergeDataCollection collectionInstance() {
+ @Override public MergeDataCollection collectionInstance() {
return new MergeDataCollection<>();
}
- public boolean containsKey(STREAM_DATA key) {
+ public boolean containsKey(INDICATOR key) {
return lockedMergeDataCollection.containsKey(key);
}
- public StreamData get(STREAM_DATA key) {
+ public Indicator get(INDICATOR key) {
return lockedMergeDataCollection.get(key);
}
- public void put(STREAM_DATA data) {
+ public void put(INDICATOR data) {
lockedMergeDataCollection.put(data);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
index 8d6c065de5..152ac5eb93 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointDispatcher.java
@@ -26,10 +26,10 @@ import org.apache.skywalking.oap.server.core.receiver.Endpoint;
*/
public class EndpointDispatcher implements SourceDispatcher {
- private final EndpointAvgAggregator avgAggregator;
+ private final EndpointLatencyAvgAggregator avgAggregator;
public EndpointDispatcher() {
- this.avgAggregator = new EndpointAvgAggregator();
+ this.avgAggregator = new EndpointLatencyAvgAggregator();
}
@Override public void dispatch(Endpoint source) {
@@ -37,9 +37,8 @@ public class EndpointDispatcher implements SourceDispatcher {
}
private void avg(Endpoint source) {
- EndpointAvgIndicate indicate = new EndpointAvgIndicate(source.getId(), source.getTimeBucket());
- indicate.setLatency(source.getLatency());
-
- avgAggregator.in(indicate);
+ EndpointLatencyAvgIndicator indicator = new EndpointLatencyAvgIndicator(source.getTimeBucket(), source.getId());
+ indicator.combine(source.getLatency(), 1);
+ avgAggregator.in(indicator);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgAggregator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
similarity index 83%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgAggregator.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
index 526135a8e2..f40990976c 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgAggregator.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgAggregator.java
@@ -24,11 +24,11 @@ import org.slf4j.*;
/**
* @author peng-yongsheng
*/
-public class EndpointAvgAggregator extends AbstractAggregator {
+public class EndpointLatencyAvgAggregator extends AbstractAggregator {
- private static final Logger logger = LoggerFactory.getLogger(EndpointAvgAggregator.class);
+ private static final Logger logger = LoggerFactory.getLogger(EndpointLatencyAvgAggregator.class);
- @Override protected void onNext(EndpointAvgIndicate data) {
+ @Override protected void onNext(EndpointLatencyAvgIndicator data) {
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgIndicate.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
similarity index 69%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgIndicate.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
index 159728c386..a200aeba7e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointAvgIndicate.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/endpoint/EndpointLatencyAvgIndicator.java
@@ -18,34 +18,24 @@
package org.apache.skywalking.oap.server.core.analysis.endpoint;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.analysis.AvgIndicate;
+import org.apache.skywalking.oap.server.core.analysis.indicator.AvgIndicator;
/**
* @author peng-yongsheng
*/
-public class EndpointAvgIndicate extends AvgIndicate {
+public class EndpointLatencyAvgIndicator extends AvgIndicator {
- @Getter private final int id;
+ private final int id;
- public EndpointAvgIndicate(int id, long timeBucket) {
+ public EndpointLatencyAvgIndicator(long timeBucket, int id) {
super(timeBucket);
this.id = id;
}
- public void setLatency(long latency) {
- setValue(latency);
- }
-
- public long getLatency() {
- return getValue();
- }
-
@Override public int hashCode() {
int result = 17;
result = 31 * result + id;
- //TODO How?
-// result = 31 * result + getTimeBucket();
+ result = 31 * result + (int)getTimeBucket();
return result;
}
@@ -57,10 +47,10 @@ public class EndpointAvgIndicate extends AvgIndicate {
if (getClass() != obj.getClass())
return false;
- EndpointAvgIndicate indicate = (EndpointAvgIndicate)obj;
- if (id != indicate.id)
+ EndpointLatencyAvgIndicator indicator = (EndpointLatencyAvgIndicator)obj;
+ if (id != indicator.id)
return false;
- if (getTimeBucket() != indicate.getTimeBucket())
+ if (getTimeBucket() != indicator.getTimeBucket())
return false;
return true;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
new file mode 100644
index 0000000000..54ca9d68c2
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/AvgIndicator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator;
+
+import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@IndicatorType
+public abstract class AvgIndicator extends Indicator {
+
+ private long summation;
+ private int count;
+
+ public AvgIndicator(long timeBucket) {
+ super(timeBucket);
+ }
+
+ @Entrance
+ public void combine(@SourceFrom long summation, @ConstOne int count) {
+ this.summation += summation;
+ this.count += count;
+ }
+
+ @Override public void combine(Indicator indicator) {
+ AvgIndicator avgIndicator = (AvgIndicator)indicator;
+ combine(avgIndicator.summation, avgIndicator.count);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Indicate.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
similarity index 82%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Indicate.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
index e0185a3057..66bb57d586 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/Indicate.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/Indicator.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.Getter;
import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
@@ -24,11 +24,13 @@ import org.apache.skywalking.oap.server.core.analysis.data.StreamData;
/**
* @author peng-yongsheng
*/
-public abstract class Indicate extends StreamData {
+public abstract class Indicator extends StreamData {
@Getter private final long timeBucket;
- public Indicate(long timeBucket) {
+ public Indicator(long timeBucket) {
this.timeBucket = timeBucket;
}
+
+ public abstract void combine(Indicator indicator);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AvgIndicate.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/ConstOne.java
similarity index 69%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AvgIndicate.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/ConstOne.java
index 4aee7b19dd..66139b1d6d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/AvgIndicate.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/ConstOne.java
@@ -16,24 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis;
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
-import lombok.*;
+import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
-public abstract class AvgIndicate extends Indicate {
-
- @Setter @Getter private long times;
- @Setter @Getter private long value;
-
- public AvgIndicate(long timeBucket) {
- super(timeBucket);
- this.times = 1;
- }
-
- public long getAvg() {
- return value / times;
- }
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.SOURCE)
+public @interface ConstOne {
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/Entrance.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/Entrance.java
new file mode 100644
index 0000000000..6cd7db8d82
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/Entrance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@Target(ElementType.METHOD)
+@Retention(RetentionPolicy.SOURCE)
+public @interface Entrance {
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
new file mode 100644
index 0000000000..7e8b8e1a68
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/IndicatorType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.SOURCE)
+public @interface IndicatorType {
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/SourceFrom.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/SourceFrom.java
new file mode 100644
index 0000000000..5830146970
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/indicator/annotation/SourceFrom.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.indicator.annotation;
+
+import java.lang.annotation.*;
+
+/**
+ * @author peng-yongsheng
+ */
+@Target(ElementType.PARAMETER)
+@Retention(RetentionPolicy.SOURCE)
+public @interface SourceFrom {
+}
--
GitLab