From 6de3e55280507397fa695e1968a5286de18d8d2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BD=AD=E5=8B=87=E5=8D=87=20pengys?= Date: Sun, 21 Jul 2019 00:26:17 +0800 Subject: [PATCH] Too many EndOfBatchContext instance created. (#3129) --- .../core/analysis/data/EndOfBatchContext.java | 39 ------------------- .../server/core/analysis/data/QueueData.java | 6 ++- .../worker/MetricsAggregateWorker.java | 8 ++-- .../worker/MetricsPersistentWorker.java | 6 +-- .../worker/RegisterDistinctWorker.java | 7 ++-- .../worker/RegisterPersistentWorker.java | 27 ++++--------- .../server/core/remote/data/StreamData.java | 16 +++++--- .../SegmentStandardization.java | 16 +++++--- 8 files changed, 42 insertions(+), 83 deletions(-) delete mode 100644 oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java deleted file mode 100644 index 528ace957..000000000 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/EndOfBatchContext.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.oap.server.core.analysis.data; - -/** - * @author peng-yongsheng - */ -public class EndOfBatchContext { - - private boolean isEndOfBatch; - - public EndOfBatchContext(boolean isEndOfBatch) { - this.isEndOfBatch = isEndOfBatch; - } - - public boolean isEndOfBatch() { - return isEndOfBatch; - } - - public void setEndOfBatch(boolean endOfBatch) { - isEndOfBatch = endOfBatch; - } -} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java index 858e62de3..494d34874 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/QueueData.java @@ -23,7 +23,9 @@ package org.apache.skywalking.oap.server.core.analysis.data; */ public interface QueueData { - EndOfBatchContext getEndOfBatchContext(); + void resetEndOfBatch(); - void setEndOfBatchContext(EndOfBatchContext context); + void asEndOfBatch(); + + boolean isEndOfBatch(); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java index 59c75bb59..ff3d0d0a8 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.java @@ -22,7 +22,7 @@ import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.analysis.data.*; +import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; @@ -63,7 +63,7 @@ public class MetricsAggregateWorker extends AbstractWorker { } @Override public final void in(Metrics metrics) { - metrics.setEndOfBatchContext(new EndOfBatchContext(false)); + metrics.resetEndOfBatch(); dataCarrier.produce(metrics); } @@ -71,7 +71,7 @@ public class MetricsAggregateWorker extends AbstractWorker { aggregationCounter.inc(); aggregate(metrics); - if (metrics.getEndOfBatchContext().isEndOfBatch()) { + if (metrics.isEndOfBatch()) { sendToNext(); } } @@ -127,7 +127,7 @@ public class MetricsAggregateWorker extends AbstractWorker { Metrics metrics = inputIterator.next(); i++; if (i == data.size()) { - metrics.getEndOfBatchContext().setEndOfBatch(true); + metrics.asEndOfBatch(); } aggregator.onWork(metrics); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java index 4e819b40e..066f70fdb 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/MetricsPersistentWorker.java @@ -22,7 +22,7 @@ import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.consumer.*; import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.analysis.data.*; +import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache; import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.exporter.ExportEvent; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; @@ -76,7 +76,7 @@ public class MetricsPersistentWorker extends PersistenceWorker { } @Override public final void in(RegisterSource source) { - source.setEndOfBatchContext(new EndOfBatchContext(false)); + source.resetEndOfBatch(); dataCarrier.produce(source); } @@ -73,7 +72,7 @@ public class RegisterDistinctWorker extends AbstractWorker { sources.get(source).combine(source); } - if (messageNum >= 1000 || source.getEndOfBatchContext().isEndOfBatch()) { + if (messageNum >= 1000 || source.isEndOfBatch()) { sources.values().forEach(nextWorker::in); sources.clear(); messageNum = 0; @@ -99,7 +98,7 @@ public class RegisterDistinctWorker extends AbstractWorker { RegisterSource source = sourceIterator.next(); i++; if (i == sources.size()) { - source.getEndOfBatchContext().setEndOfBatch(true); + source.asEndOfBatch(); } aggregator.onWork(source); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java index 2fb3101fc..3a481e948 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java @@ -18,27 +18,16 @@ package org.apache.skywalking.oap.server.core.register.worker; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; +import java.util.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; -import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool; -import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory; -import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer; -import org.apache.skywalking.oap.server.core.Const; -import org.apache.skywalking.oap.server.core.UnexpectedException; -import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext; +import org.apache.skywalking.apm.commons.datacarrier.consumer.*; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.register.RegisterSource; import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine; -import org.apache.skywalking.oap.server.core.storage.IRegisterDAO; -import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO; -import org.apache.skywalking.oap.server.core.storage.StorageModule; +import org.apache.skywalking.oap.server.core.storage.*; import org.apache.skywalking.oap.server.core.worker.AbstractWorker; import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.slf4j.*; /** * @author peng-yongsheng @@ -80,7 +69,7 @@ public class RegisterPersistentWorker extends AbstractWorker { } @Override public final void in(RegisterSource registerSource) { - registerSource.setEndOfBatchContext(new EndOfBatchContext(false)); + registerSource.resetEndOfBatch(); dataCarrier.produce(registerSource); } @@ -91,7 +80,7 @@ public class RegisterPersistentWorker extends AbstractWorker { sources.get(registerSource).combine(registerSource); } - if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) { + if (sources.size() > 1000 || registerSource.isEndOfBatch()) { sources.values().forEach(source -> { try { RegisterSource dbSource = registerDAO.get(modelName, source.id()); @@ -147,7 +136,7 @@ public class RegisterPersistentWorker extends AbstractWorker { RegisterSource registerSource = sourceIterator.next(); i++; if (i == data.size()) { - registerSource.getEndOfBatchContext().setEndOfBatch(true); + registerSource.asEndOfBatch(); } persistent.onWork(registerSource); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java index 532130f1a..d2875eb23 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/data/StreamData.java @@ -18,7 +18,7 @@ package org.apache.skywalking.oap.server.core.remote.data; -import org.apache.skywalking.oap.server.core.analysis.data.*; +import org.apache.skywalking.oap.server.core.analysis.data.QueueData; import org.apache.skywalking.oap.server.core.remote.*; /** @@ -26,14 +26,18 @@ import org.apache.skywalking.oap.server.core.remote.*; */ public abstract class StreamData implements QueueData, Serializable, Deserializable { - private EndOfBatchContext endOfBatchContext; + private boolean endOfBatch = false; - @Override public final EndOfBatchContext getEndOfBatchContext() { - return this.endOfBatchContext; + @Override public void resetEndOfBatch() { + this.endOfBatch = false; } - @Override public final void setEndOfBatchContext(EndOfBatchContext context) { - this.endOfBatchContext = context; + @Override public void asEndOfBatch() { + this.endOfBatch = true; + } + + @Override public boolean isEndOfBatch() { + return this.endOfBatch; } public abstract int remoteHashCode(); diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java index 6e23e47ef..d1e308432 100644 --- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java +++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SegmentStandardization.java @@ -19,7 +19,7 @@ package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization; import org.apache.skywalking.apm.network.language.agent.UpstreamSegment; -import org.apache.skywalking.oap.server.core.analysis.data.*; +import org.apache.skywalking.oap.server.core.analysis.data.QueueData; /** * @author peng-yongsheng @@ -36,14 +36,18 @@ public class SegmentStandardization implements QueueData { return id; } - private EndOfBatchContext context; + private boolean endOfBatch = false; - @Override public EndOfBatchContext getEndOfBatchContext() { - return this.context; + @Override public void resetEndOfBatch() { + this.endOfBatch = false; } - @Override public void setEndOfBatchContext(EndOfBatchContext context) { - this.context = context; + @Override public void asEndOfBatch() { + this.endOfBatch = true; + } + + @Override public boolean isEndOfBatch() { + return this.endOfBatch; } private UpstreamSegment upstreamSegment; -- GitLab