提交 506c00b4 编写于 作者: P peng-yongsheng

Change the aggregation and persistence worker’s generic type definition.

上级 0a4171b9
......@@ -21,7 +21,7 @@ package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.DataCache;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AggregationWorker<INPUT extends AbstractData, OUTPUT extends AbstractData> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
public abstract class AggregationWorker<INPUT extends StreamData, OUTPUT extends StreamData> extends AbstractLocalAsyncWorker<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(AggregationWorker.class);
......@@ -47,9 +47,7 @@ public abstract class AggregationWorker<INPUT extends AbstractData, OUTPUT exten
}
@Override protected final void onWork(INPUT message) throws WorkerException {
boolean isEndOfBatch = message.isEndOfBatch();
OUTPUT output = transform(message);
output.setEndOfBatch(isEndOfBatch);
messageNum++;
aggregate(output);
......@@ -58,8 +56,7 @@ public abstract class AggregationWorker<INPUT extends AbstractData, OUTPUT exten
sendToNext();
messageNum = 0;
}
if (output.isEndOfBatch()) {
output.setEndOfBatch(false);
if (message.getEndOfBatchContext().isEndOfBatch()) {
sendToNext();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
/**
* @author peng-yongsheng
*/
public class FlushAndSwitch {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
/**
* @author peng-yongsheng
*/
public class MessageHolder<MESSAGE extends EndOfBatchQueueMessage> {
private MESSAGE message;
public MESSAGE getMessage() {
return message;
}
public void setMessage(MESSAGE message) {
this.message = message;
}
public void reset() {
message = null;
}
}
......@@ -24,7 +24,7 @@ import java.util.Map;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.data.DataCache;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.ObjectUtils;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class PersistenceWorker<INPUT_AND_OUTPUT extends AbstractData> extends AbstractLocalAsyncWorker<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT> {
public abstract class PersistenceWorker<INPUT_AND_OUTPUT extends StreamData> extends AbstractLocalAsyncWorker<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
......
......@@ -19,13 +19,13 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.impl;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public abstract class PersistenceWorkerProvider<INPUT_AND_OUTPUT extends AbstractData, WORKER_TYPE extends PersistenceWorker<INPUT_AND_OUTPUT>> extends AbstractLocalAsyncWorkerProvider<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT, WORKER_TYPE> {
public abstract class PersistenceWorkerProvider<INPUT_AND_OUTPUT extends StreamData, WORKER_TYPE extends PersistenceWorker<INPUT_AND_OUTPUT>> extends AbstractLocalAsyncWorkerProvider<INPUT_AND_OUTPUT, INPUT_AND_OUTPUT, WORKER_TYPE> {
public PersistenceWorkerProvider(ModuleManager moduleManager) {
super(moduleManager);
......
......@@ -16,20 +16,19 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.impl.data;
import org.apache.skywalking.apm.collector.core.cache.Window;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
/**
* @author peng-yongsheng
*/
public class DataCache<DATA_IMPL extends AbstractData> extends Window<DataCollection<DATA_IMPL>> {
public class DataCache<STREAM_DATA extends StreamData> extends Window<DataCollection<STREAM_DATA>> {
private DataCollection<DATA_IMPL> lockedDataCollection;
private DataCollection<STREAM_DATA> lockedDataCollection;
@Override public DataCollection<DATA_IMPL> collectionInstance() {
@Override public DataCollection<STREAM_DATA> collectionInstance() {
return new DataCollection<>();
}
......@@ -37,11 +36,11 @@ public class DataCache<DATA_IMPL extends AbstractData> extends Window<DataCollec
return lockedDataCollection.containsKey(id);
}
public AbstractData get(String id) {
public StreamData get(String id) {
return lockedDataCollection.get(id);
}
public void put(String id, DATA_IMPL data) {
public void put(String id, STREAM_DATA data) {
lockedDataCollection.put(id, data);
}
......
......@@ -16,19 +16,18 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.worker.model.impl.data;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.skywalking.apm.collector.core.cache.Collection;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
/**
* @author peng-yongsheng
*/
public class DataCollection<DATA_IMPL extends AbstractData> implements Collection<Map<String, DATA_IMPL>> {
private Map<String, DATA_IMPL> data;
public class DataCollection<STREAM_DATA extends StreamData> implements Collection<Map<String, STREAM_DATA>> {
private Map<String, STREAM_DATA> data;
private volatile boolean writing;
private volatile boolean reading;
......@@ -66,11 +65,11 @@ public class DataCollection<DATA_IMPL extends AbstractData> implements Collectio
return data.containsKey(key);
}
void put(String key, DATA_IMPL value) {
void put(String key, STREAM_DATA value) {
data.put(key, value);
}
public DATA_IMPL get(String key) {
public STREAM_DATA get(String key) {
return data.get(key);
}
......@@ -82,7 +81,7 @@ public class DataCollection<DATA_IMPL extends AbstractData> implements Collectio
data.clear();
}
public Map<String, DATA_IMPL> collection() {
public Map<String, STREAM_DATA> collection() {
return data;
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.cache;
import java.util.concurrent.atomic.AtomicInteger;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册