提交 6de3e552 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Too many EndOfBatchContext instance created. (#3129)

上级 d256fc34
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
/**
* @author peng-yongsheng
*/
public class EndOfBatchContext {
private boolean isEndOfBatch;
public EndOfBatchContext(boolean isEndOfBatch) {
this.isEndOfBatch = isEndOfBatch;
}
public boolean isEndOfBatch() {
return isEndOfBatch;
}
public void setEndOfBatch(boolean endOfBatch) {
isEndOfBatch = endOfBatch;
}
}
......@@ -23,7 +23,9 @@ package org.apache.skywalking.oap.server.core.analysis.data;
*/
public interface QueueData {
EndOfBatchContext getEndOfBatchContext();
void resetEndOfBatch();
void setEndOfBatchContext(EndOfBatchContext context);
void asEndOfBatch();
boolean isEndOfBatch();
}
......@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -63,7 +63,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
}
@Override public final void in(Metrics metrics) {
metrics.setEndOfBatchContext(new EndOfBatchContext(false));
metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
......@@ -71,7 +71,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
aggregationCounter.inc();
aggregate(metrics);
if (metrics.getEndOfBatchContext().isEndOfBatch()) {
if (metrics.isEndOfBatch()) {
sendToNext();
}
}
......@@ -127,7 +127,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
metrics.getEndOfBatchContext().setEndOfBatch(true);
metrics.asEndOfBatch();
}
aggregator.onWork(metrics);
}
......
......@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
......@@ -76,7 +76,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
}
@Override public void in(Metrics metrics) {
metrics.setEndOfBatchContext(new EndOfBatchContext(false));
metrics.resetEndOfBatch();
dataCarrier.produce(metrics);
}
......@@ -193,7 +193,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
Metrics metrics = inputIterator.next();
i++;
if (i == data.size()) {
metrics.getEndOfBatchContext().setEndOfBatch(true);
metrics.asEndOfBatch();
}
persistent.onWork(metrics);
}
......
......@@ -22,7 +22,6 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -60,7 +59,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
}
@Override public final void in(RegisterSource source) {
source.setEndOfBatchContext(new EndOfBatchContext(false));
source.resetEndOfBatch();
dataCarrier.produce(source);
}
......@@ -73,7 +72,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
sources.get(source).combine(source);
}
if (messageNum >= 1000 || source.getEndOfBatchContext().isEndOfBatch()) {
if (messageNum >= 1000 || source.isEndOfBatch()) {
sources.values().forEach(nextWorker::in);
sources.clear();
messageNum = 0;
......@@ -99,7 +98,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
RegisterSource source = sourceIterator.next();
i++;
if (i == sources.size()) {
source.getEndOfBatchContext().setEndOfBatch(true);
source.asEndOfBatch();
}
aggregator.onWork(source);
}
......
......@@ -18,27 +18,16 @@
package org.apache.skywalking.oap.server.core.register.worker;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -80,7 +69,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
}
@Override public final void in(RegisterSource registerSource) {
registerSource.setEndOfBatchContext(new EndOfBatchContext(false));
registerSource.resetEndOfBatch();
dataCarrier.produce(registerSource);
}
......@@ -91,7 +80,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
sources.get(registerSource).combine(registerSource);
}
if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
if (sources.size() > 1000 || registerSource.isEndOfBatch()) {
sources.values().forEach(source -> {
try {
RegisterSource dbSource = registerDAO.get(modelName, source.id());
......@@ -147,7 +136,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
RegisterSource registerSource = sourceIterator.next();
i++;
if (i == data.size()) {
registerSource.getEndOfBatchContext().setEndOfBatch(true);
registerSource.asEndOfBatch();
}
persistent.onWork(registerSource);
}
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.oap.server.core.remote.data;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
import org.apache.skywalking.oap.server.core.remote.*;
/**
......@@ -26,14 +26,18 @@ import org.apache.skywalking.oap.server.core.remote.*;
*/
public abstract class StreamData implements QueueData, Serializable, Deserializable {
private EndOfBatchContext endOfBatchContext;
private boolean endOfBatch = false;
@Override public final EndOfBatchContext getEndOfBatchContext() {
return this.endOfBatchContext;
@Override public void resetEndOfBatch() {
this.endOfBatch = false;
}
@Override public final void setEndOfBatchContext(EndOfBatchContext context) {
this.endOfBatchContext = context;
@Override public void asEndOfBatch() {
this.endOfBatch = true;
}
@Override public boolean isEndOfBatch() {
return this.endOfBatch;
}
public abstract int remoteHashCode();
......
......@@ -19,7 +19,7 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization;
import org.apache.skywalking.apm.network.language.agent.UpstreamSegment;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
/**
* @author peng-yongsheng
......@@ -36,14 +36,18 @@ public class SegmentStandardization implements QueueData {
return id;
}
private EndOfBatchContext context;
private boolean endOfBatch = false;
@Override public EndOfBatchContext getEndOfBatchContext() {
return this.context;
@Override public void resetEndOfBatch() {
this.endOfBatch = false;
}
@Override public void setEndOfBatchContext(EndOfBatchContext context) {
this.context = context;
@Override public void asEndOfBatch() {
this.endOfBatch = true;
}
@Override public boolean isEndOfBatch() {
return this.endOfBatch;
}
private UpstreamSegment upstreamSegment;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册