未验证 提交 d97af96c 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

New ReadWriteSafeCache. Rewrite the whole Window and Collection. Simplify codes (#4733)

* Rewrite the whole window and collection to new ReadWriteSafeCache.

* Remove redundant fields.
上级 d51c7881
......@@ -18,9 +18,25 @@
package org.apache.skywalking.oap.server.core.analysis.data;
public interface DataCache {
import java.util.List;
void writing();
/**
* BufferedData represents a data collection in the memory. Data could be accepted and be drain to other collection.
*
* {@link #accept(Object)} and {@link #read()} wouldn't be required to thread-safe. BufferedData usually hosts by {@link
* ReadWriteSafeCache}.
*/
public interface BufferedData<T> {
/**
* Accept the data into the cache if it fits the conditions. The implementation maybe wouldn't accept the new input
* data.
*
* @param data to be added potentially.
*/
void accept(T data);
void finishWriting();
/**
* Read all existing buffered data, and clear the memory.
*/
List<T> read();
}
......@@ -19,111 +19,61 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import org.apache.skywalking.oap.server.core.storage.StorageData;
public class LimitedSizeDataCollection<STORAGE_DATA extends ComparableStorageData> implements SWCollection<STORAGE_DATA> {
private final HashMap<STORAGE_DATA, LinkedList<STORAGE_DATA>> data;
/**
* LimitedSizeBufferedData is a thread no safe implementation of {@link BufferedData}. It collects limited records of
* each {@link StorageData#id()}.
*/
public class LimitedSizeBufferedData<STORAGE_DATA extends ComparableStorageData & StorageData> implements BufferedData<STORAGE_DATA> {
private final HashMap<String, LinkedList<STORAGE_DATA>> data;
private final int limitedSize;
private volatile boolean writing;
private volatile boolean reading;
LimitedSizeDataCollection(int limitedSize) {
public LimitedSizeBufferedData(int limitedSize) {
this.data = new HashMap<>();
this.writing = false;
this.reading = false;
this.limitedSize = limitedSize;
}
@Override
public void finishWriting() {
writing = false;
}
@Override
public void writing() {
writing = true;
}
@Override
public boolean isWriting() {
return writing;
}
@Override
public void finishReading() {
reading = false;
}
@Override
public void reading() {
reading = true;
}
@Override
public boolean isReading() {
return reading;
}
@Override
public int size() {
return data.size();
}
@Override
public void clear() {
data.clear();
}
@Override
public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("Limited size data collection doesn't support containsKey operation.");
}
@Override
public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("Limited size data collection doesn't support get operation.");
}
@Override
public void put(STORAGE_DATA value) {
LinkedList<STORAGE_DATA> storageDataList = this.data.get(value);
public void accept(final STORAGE_DATA data) {
final String id = data.id();
LinkedList<STORAGE_DATA> storageDataList = this.data.get(id);
if (storageDataList == null) {
storageDataList = new LinkedList<>();
data.put(value, storageDataList);
this.data.put(id, storageDataList);
}
if (storageDataList.size() < limitedSize) {
storageDataList.add(value);
storageDataList.add(data);
return;
}
for (int i = 0; i < storageDataList.size(); i++) {
STORAGE_DATA storageData = storageDataList.get(i);
if (value.compareTo(storageData) <= 0) {
if (data.compareTo(storageData) <= 0) {
if (i == 0) {
// input value is less than the smallest in top N list, ignore
// input data is less than the smallest in top N list, ignore
} else {
// Remove the smallest in top N list
// add the current value into the right position
storageDataList.add(i, value);
// add the current data into the right position
storageDataList.add(i, data);
storageDataList.removeFirst();
}
return;
}
}
// Add the value as biggest in top N list
storageDataList.addLast(value);
// Add the data as biggest in top N list
storageDataList.addLast(data);
storageDataList.removeFirst();
}
@Override
public Collection<STORAGE_DATA> collection() {
public List<STORAGE_DATA> read() {
List<STORAGE_DATA> collection = new ArrayList<>();
data.values().forEach(e -> e.forEach(collection::add));
return collection;
......
......@@ -18,37 +18,45 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
public class MergeDataCache<METRICS extends Metrics> extends Window<METRICS> implements DataCache {
private SWCollection<METRICS> lockedMergeDataCollection;
@Override
public SWCollection<METRICS> collectionInstance() {
return new MergeDataCollection<>();
}
public boolean containsKey(METRICS key) {
return lockedMergeDataCollection.containsKey(key);
}
public Metrics get(METRICS key) {
return lockedMergeDataCollection.get(key);
}
/**
* MergableBufferedData is a thread no safe implementation of {@link BufferedData}. {@link Metrics} in this cache would
* be {@link Metrics#combine(Metrics)} if their {@link Metrics#id()}s are same.
*
* Concurrency {@link #accept(Metrics)}s and {@link #read()} while {@link #accept(Metrics)} are both not recommended.
*/
public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
private Map<String, METRICS> buffer;
public void put(METRICS data) {
lockedMergeDataCollection.put(data);
public MergableBufferedData() {
buffer = new HashMap<>();
}
/**
* Accept the data into the cache and merge with the existing value.
*
* This method is not thread safe, should avoid concurrency calling.
*
* @param data to be added potentially.
*/
@Override
public void writing() {
lockedMergeDataCollection = getCurrentAndWriting();
public void accept(final METRICS data) {
final String id = data.id();
final METRICS existed = buffer.get(id);
if (existed == null) {
buffer.put(id, data);
} else {
existed.combine(data);
}
}
@Override
public void finishWriting() {
lockedMergeDataCollection.finishWriting();
lockedMergeDataCollection = null;
public List<METRICS> read() {
return buffer.values().stream().collect(Collectors.toList());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
private Map<STREAM_DATA, STREAM_DATA> collection;
private volatile boolean writing;
private volatile boolean reading;
MergeDataCollection() {
this.collection = new HashMap<>();
this.writing = false;
this.reading = false;
}
@Override
public void finishWriting() {
writing = false;
}
@Override
public void writing() {
writing = true;
}
@Override
public boolean isWriting() {
return writing;
}
@Override
public void finishReading() {
reading = false;
}
@Override
public void reading() {
reading = true;
}
@Override
public boolean isReading() {
return reading;
}
@Override
public boolean containsKey(STREAM_DATA key) {
return collection.containsKey(key);
}
@Override
public void put(STREAM_DATA value) {
collection.put(value, value);
}
@Override
public STREAM_DATA get(STREAM_DATA key) {
return collection.get(key);
}
@Override
public int size() {
return collection.size();
}
@Override
public void clear() {
collection.clear();
}
@Override
public Collection<STREAM_DATA> collection() {
return collection.values();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.storage.StorageData;
public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
@Override
public SWCollection<STORAGE_DATA> collectionInstance() {
return new NonMergeDataCollection<>();
}
public void add(STORAGE_DATA data) {
lockedMergeDataCollection.put(data);
}
@Override
public void writing() {
lockedMergeDataCollection = getCurrentAndWriting();
}
@Override
public void finishWriting() {
lockedMergeDataCollection.finishWriting();
lockedMergeDataCollection = null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.skywalking.oap.server.core.storage.StorageData;
public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
private final List<STORAGE_DATA> data;
private volatile boolean writing;
private volatile boolean reading;
NonMergeDataCollection() {
this.data = new ArrayList<>();
this.writing = false;
this.reading = false;
}
@Override
public void finishWriting() {
writing = false;
}
@Override
public void writing() {
writing = true;
}
@Override
public boolean isWriting() {
return writing;
}
@Override
public void finishReading() {
reading = false;
}
@Override
public void reading() {
reading = true;
}
@Override
public boolean isReading() {
return reading;
}
@Override
public int size() {
return data.size();
}
@Override
public void clear() {
data.clear();
}
@Override
public boolean containsKey(STORAGE_DATA key) {
throw new UnsupportedOperationException("Non merge data collection doesn't support containsKey operation.");
}
@Override
public STORAGE_DATA get(STORAGE_DATA key) {
throw new UnsupportedOperationException("Non merge data collection doesn't support get operation.");
}
@Override
public void put(STORAGE_DATA value) {
data.add(value);
}
@Override
public Collection<STORAGE_DATA> collection() {
return data;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
public interface QueueData {
void resetEndOfBatch();
void asEndOfBatch();
boolean isEndOfBatch();
}
......@@ -18,37 +18,58 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
public class LimitedSizeDataCache<STORAGE_DATA extends ComparableStorageData> extends Window<STORAGE_DATA> implements DataCache {
private SWCollection<STORAGE_DATA> limitedSizeDataCollection;
private final int limitSize;
public LimitedSizeDataCache(int limitSize) {
super(false);
this.limitSize = limitSize;
init();
}
/**
* ReadWriteSafeCache provides a read/write isolated cache.
*/
public class ReadWriteSafeCache<T> {
/**
* Pointer of read buffer.
*/
private volatile BufferedData<T> readBufferPointer;
/**
* Pointer of write buffer.
*/
private volatile BufferedData<T> writeBufferPointer;
/**
* Read/Write lock.
*/
private final ReentrantLock lock;
@Override
public SWCollection<STORAGE_DATA> collectionInstance() {
return new LimitedSizeDataCollection<>(limitSize);
/**
* Build the Cache through two given buffer instances.
*
* @param buffer1 read/write switchable buffer
* @param buffer2 read/write switchable buffer. It is the write buffer at the beginning.
*/
public ReadWriteSafeCache(BufferedData<T> buffer1, BufferedData<T> buffer2) {
readBufferPointer = buffer1;
writeBufferPointer = buffer2;
lock = new ReentrantLock();
}
public void add(STORAGE_DATA data) {
limitedSizeDataCollection.put(data);
public void write(T data) {
lock.lock();
try {
writeBufferPointer.accept(data);
} finally {
lock.unlock();
}
}
@Override
public void writing() {
limitedSizeDataCollection = getCurrentAndWriting();
}
public List<T> read() {
lock.lock();
try {
// Switch the read and write pointers, when there is no writing.
BufferedData<T> tempPointer = writeBufferPointer;
writeBufferPointer = readBufferPointer;
readBufferPointer = tempPointer;
@Override
public void finishWriting() {
limitedSizeDataCollection.finishWriting();
limitedSizeDataCollection = null;
return readBufferPointer.read();
} finally {
lock.unlock();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.Collection;
public interface SWCollection<DATA> {
void reading();
boolean isReading();
void writing();
boolean isWriting();
void clear();
int size();
void finishReading();
void finishWriting();
Collection<DATA> collection();
boolean containsKey(DATA key);
DATA get(DATA key);
void put(DATA value);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Data cache window. Window holds two data collections(A and B). They are switchable based on outside command. In any
* time, one collection is accepting the input data, and the other is immutable.
*
* This window makes sure there is not concurrency read-write situation.
*
* @param <DATA> type in the Window and internal collection.
*/
public abstract class Window<DATA> {
private AtomicInteger windowSwitch = new AtomicInteger(0);
private SWCollection<DATA> pointer;
private SWCollection<DATA> windowDataA;
private SWCollection<DATA> windowDataB;
Window() {
this(true);
}
Window(boolean autoInit) {
if (autoInit) {
init();
}
}
protected void init() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
}
public abstract SWCollection<DATA> collectionInstance();
public boolean trySwitchPointer() {
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
}
public void trySwitchPointerFinally() {
windowSwitch.addAndGet(-1);
}
public void switchPointer() {
if (pointer == windowDataA) {
pointer = windowDataB;
} else {
pointer = windowDataA;
}
getLast().reading();
}
SWCollection<DATA> getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.writing();
return windowDataA;
} else {
windowDataB.writing();
return windowDataB;
}
}
private SWCollection<DATA> getCurrent() {
return pointer;
}
public int currentCollectionSize() {
return getCurrent().size();
}
public SWCollection<DATA> getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
return windowDataA;
}
}
public void finishReadingLast() {
getLast().clear();
getLast().finishReading();
}
}
......@@ -20,12 +20,14 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
......@@ -33,8 +35,6 @@ import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* MetricsAggregateWorker provides an in-memory metrics merging capability. This aggregation is called L1 aggregation,
......@@ -42,20 +42,18 @@ import org.slf4j.LoggerFactory;
* bucket, the L1 aggregation will merge them into one metrics object to reduce the unnecessary memory and network
* payload.
*/
@Slf4j
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private static final Logger logger = LoggerFactory.getLogger(MetricsAggregateWorker.class);
private AbstractWorker<Metrics> nextWorker;
private final DataCarrier<Metrics> dataCarrier;
private final MergeDataCache<Metrics> mergeDataCache;
private final ReadWriteSafeCache<Metrics> mergeDataCache;
private CounterMetrics aggregationCounter;
MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> nextWorker,
String modelName) {
super(moduleDefineHolder);
this.nextWorker = nextWorker;
this.mergeDataCache = new MergeDataCache<>();
this.mergeDataCache = new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData());
String name = "METRICS_L1_AGGREGATION";
this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + modelName, name, 2, 10000);
......@@ -85,42 +83,18 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
private void onWork(Metrics metrics) {
aggregationCounter.inc();
aggregate(metrics);
mergeDataCache.write(metrics);
if (metrics.isEndOfBatch()) {
sendToNext();
}
}
private void sendToNext() {
mergeDataCache.switchPointer();
while (mergeDataCache.getLast().isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
mergeDataCache.getLast().collection().forEach(data -> {
if (logger.isDebugEnabled()) {
logger.debug(data.toString());
}
nextWorker.in(data);
});
mergeDataCache.finishReadingLast();
}
private void aggregate(Metrics metrics) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(metrics)) {
mergeDataCache.get(metrics).combine(metrics);
} else {
mergeDataCache.put(metrics);
mergeDataCache.read().forEach(
data -> {
if (log.isDebugEnabled()) {
log.debug(data.toString());
}
nextWorker.in(data);
}
);
}
mergeDataCache.finishWriting();
}
private class AggregatorConsumer implements IConsumer<Metrics> {
......@@ -153,7 +127,7 @@ public class MetricsAggregateWorker extends AbstractWorker<Metrics> {
@Override
public void onError(List<Metrics> data, Throwable t) {
logger.error(t.getMessage(), t);
log.error(t.getMessage(), t);
}
@Override
......
......@@ -32,7 +32,8 @@ import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.exporter.ExportEvent;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
......@@ -45,10 +46,9 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
* MetricsPersistentWorker is an extension of {@link PersistenceWorker} and focuses on the Metrics data persistent.
*/
@Slf4j
public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDataCache<Metrics>> {
public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private final Model model;
private final Map<Metrics, Metrics> context;
private final MergeDataCache<Metrics> mergeDataCache;
private final IMetricsDAO metricsDAO;
private final Optional<AbstractWorker<Metrics>> nextAlarmWorker;
private final Optional<AbstractWorker<ExportEvent>> nextExportWorker;
......@@ -60,11 +60,10 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate) {
super(moduleDefineHolder);
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model;
this.context = new HashMap<>(100);
this.enableDatabaseSession = enableDatabaseSession;
this.mergeDataCache = new MergeDataCache<>();
this.metricsDAO = metricsDAO;
this.nextAlarmWorker = Optional.ofNullable(nextAlarmWorker);
this.nextExportWorker = Optional.ofNullable(nextExportWorker);
......@@ -111,11 +110,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
dataCarrier.produce(metrics);
}
@Override
public MergeDataCache<Metrics> getCache() {
return mergeDataCache;
}
@Override
public void prepareBatch(Collection<Metrics> lastCollection, List<PrepareRequest> prepareRequests) {
long start = System.currentTimeMillis();
......@@ -197,21 +191,6 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics, MergeDat
nextExportWorker -> nextExportWorker.in(new ExportEvent(metrics, ExportEvent.EventType.TOTAL)));
}
@Override
public void cacheData(Metrics input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
Metrics metrics = mergeDataCache.get(input);
metrics.combine(input);
metrics.calculate();
} else {
input.calculate();
mergeDataCache.put(input);
}
mergeDataCache.finishWriting();
}
/**
* Load data from the storage, if {@link #enableDatabaseSession} == true, only load data when the id doesn't exist.
*/
......
......@@ -20,28 +20,29 @@ package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Collection;
import java.util.List;
import org.apache.skywalking.oap.server.core.analysis.data.SWCollection;
import org.apache.skywalking.oap.server.core.analysis.data.Window;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.storage.StorageData;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* PersistenceWorker take the responsibility to pushing data to the final storage. The target storage is based on the
* activate storage implementation. This worker controls the persistence flow.
*
* @param <INPUT> The type of worker input. All inputs will be merged and saved.
* @param <CACHE> Cache type to hold all input.
*/
public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
@Slf4j
public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
@Getter(AccessLevel.PROTECTED)
private final ReadWriteSafeCache<INPUT> cache;
private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
PersistenceWorker(ModuleDefineHolder moduleDefineHolder) {
PersistenceWorker(ModuleDefineHolder moduleDefineHolder, ReadWriteSafeCache<INPUT> cache) {
super(moduleDefineHolder);
this.cache = cache;
}
/**
......@@ -54,9 +55,9 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
/**
* Cache data based on different strategies. See the implementations for more details.
*/
public abstract void cacheData(INPUT input);
public abstract CACHE getCache();
public void cacheData(INPUT input) {
cache.write(input);
}
/**
* The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
......@@ -66,24 +67,6 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
*/
public abstract void endOfRound(long tookTime);
/**
* For every cache implementation(see {@link Window}), there are two dataset, switch them when one persistence round
* is beginning, in order to make cached data immutable.
*
* @return true if switch successfully.
*/
public boolean flushAndSwitch() {
boolean isSwitch;
try {
if (isSwitch = getCache().trySwitchPointer()) {
getCache().switchPointer();
}
} finally {
getCache().trySwitchPointerFinally();
}
return isSwitch;
}
/**
* Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage
* implementations.
......@@ -93,22 +76,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends
*/
public abstract void prepareBatch(Collection<INPUT> lastCollection, List<PrepareRequest> prepareRequests);
public final void buildBatchRequests(List<PrepareRequest> prepareRequests) {
try {
SWCollection<INPUT> last = getCache().getLast();
while (last.isWriting()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn("thread wake up");
}
}
if (last.collection() != null) {
prepareBatch(last.collection(), prepareRequests);
}
} finally {
getCache().finishReadingLast();
}
public void buildBatchRequests(List<PrepareRequest> prepareRequests) {
final List<INPUT> dataList = getCache().read();
prepareBatch(dataList, prepareRequests);
}
}
......@@ -22,7 +22,8 @@ import java.util.Collection;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.LimitedSizeBufferedData;
import org.apache.skywalking.oap.server.core.analysis.data.ReadWriteSafeCache;
import org.apache.skywalking.oap.server.core.analysis.topn.TopN;
import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
import org.apache.skywalking.oap.server.core.storage.model.Model;
......@@ -34,59 +35,43 @@ import org.slf4j.LoggerFactory;
/**
* Top N worker is a persistence worker. Cache and order the data, flush in longer period.
*/
public class TopNWorker extends PersistenceWorker<TopN, LimitedSizeDataCache<TopN>> {
public class TopNWorker extends PersistenceWorker<TopN> {
private static final Logger logger = LoggerFactory.getLogger(TopNWorker.class);
private final LimitedSizeDataCache<TopN> limitedSizeDataCache;
private final IRecordDAO recordDAO;
private final Model model;
private final DataCarrier<TopN> dataCarrier;
private long reportCycle;
private long reportPeriod;
private volatile long lastReportTimestamp;
TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportCycle,
IRecordDAO recordDAO) {
super(moduleDefineHolder);
this.limitedSizeDataCache = new LimitedSizeDataCache<>(topNSize);
TopNWorker(ModuleDefineHolder moduleDefineHolder, Model model, int topNSize, long reportPeriod,
IRecordDAO recordDAO) {
super(
moduleDefineHolder,
new ReadWriteSafeCache<>(new LimitedSizeBufferedData<>(topNSize), new LimitedSizeBufferedData<>(topNSize))
);
this.recordDAO = recordDAO;
this.model = model;
this.dataCarrier = new DataCarrier<>("TopNWorker", 1, 1000);
this.dataCarrier.consume(new TopNWorker.TopNConsumer(), 1);
this.lastReportTimestamp = System.currentTimeMillis();
// Top N persistent works per 10 minutes default.
this.reportCycle = reportCycle;
}
@Override
public void cacheData(TopN data) {
limitedSizeDataCache.writing();
try {
limitedSizeDataCache.add(data);
} finally {
limitedSizeDataCache.finishWriting();
}
}
@Override
public LimitedSizeDataCache<TopN> getCache() {
return limitedSizeDataCache;
this.reportPeriod = reportPeriod;
}
/**
* The top N worker persistent cycle is much less than the others, override `flushAndSwitch` to extend the execute
* time windows.
* <p>
* Switch and persistent attempt happens based on reportCycle.
* Force overriding the parent buildBatchRequests. Use its own report period.
*/
@Override
public boolean flushAndSwitch() {
public void buildBatchRequests(final List<PrepareRequest> prepareRequests) {
long now = System.currentTimeMillis();
if (now - lastReportTimestamp <= reportCycle) {
return false;
if (now - lastReportTimestamp <= reportPeriod) {
// Only do report in its own report period.
return;
}
lastReportTimestamp = now;
return super.flushAndSwitch();
super.buildBatchRequests(prepareRequests);
}
@Override
......
......@@ -18,25 +18,20 @@
package org.apache.skywalking.oap.server.core.remote.data;
import org.apache.skywalking.oap.server.core.analysis.data.QueueData;
import org.apache.skywalking.oap.server.core.remote.Deserializable;
import org.apache.skywalking.oap.server.core.remote.Serializable;
public abstract class StreamData implements QueueData, Serializable, Deserializable {
public abstract class StreamData implements Serializable, Deserializable {
private boolean endOfBatch = false;
@Override
public void resetEndOfBatch() {
this.endOfBatch = false;
}
@Override
public void asEndOfBatch() {
this.endOfBatch = true;
}
@Override
public boolean isEndOfBatch() {
return this.endOfBatch;
}
......
......@@ -62,14 +62,26 @@ public enum PersistenceTimer {
MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
errorCounter = metricsCreator.createCounter("persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
prepareLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
executeLatency = metricsCreator.createHistogramMetric("persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
errorCounter = metricsCreator.createCounter(
"persistence_timer_bulk_error_count", "Error execution of the prepare stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
prepareLatency = metricsCreator.createHistogramMetric(
"persistence_timer_bulk_prepare_latency", "Latency of the prepare stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
executeLatency = metricsCreator.createHistogramMetric(
"persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(), TimeUnit.SECONDS);
.scheduleWithFixedDelay(
new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> logger
.error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
TimeUnit.SECONDS
);
this.isStarted = true;
}
......@@ -95,9 +107,7 @@ public enum PersistenceTimer {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
if (worker.flushAndSwitch()) {
worker.buildBatchRequests(prepareRequests);
}
worker.buildBatchRequests(prepareRequests);
worker.endOfRound(System.currentTimeMillis() - lastTime);
});
......
......@@ -23,18 +23,18 @@ import org.apache.skywalking.oap.server.core.storage.ComparableStorageData;
import org.junit.Assert;
import org.junit.Test;
public class LimitedSizeDataCollectionTest {
public class LimitedSizeBufferedDataTest {
@Test
public void testPut() {
LimitedSizeDataCollection<MockStorageData> collection = new LimitedSizeDataCollection<>(5);
collection.put(new MockStorageData(1));
collection.put(new MockStorageData(3));
collection.put(new MockStorageData(5));
collection.put(new MockStorageData(7));
collection.put(new MockStorageData(9));
LimitedSizeBufferedData<MockStorageData> collection = new LimitedSizeBufferedData<>(5);
collection.accept(new MockStorageData(1));
collection.accept(new MockStorageData(3));
collection.accept(new MockStorageData(5));
collection.accept(new MockStorageData(7));
collection.accept(new MockStorageData(9));
MockStorageData income = new MockStorageData(4);
collection.put(income);
collection.accept(income);
int[] expected = new int[] {
3,
......@@ -44,7 +44,7 @@ public class LimitedSizeDataCollectionTest {
9
};
int i = 0;
for (MockStorageData data : collection.collection()) {
for (MockStorageData data : collection.read()) {
Assert.assertEquals(expected[i++], data.latency);
}
}
......@@ -64,7 +64,7 @@ public class LimitedSizeDataCollectionTest {
@Override
public String id() {
return null;
return "id";
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册