提交 3f30e6dc 编写于 作者: P peng-yongsheng

Modify base worker modal’s generic type definition.

上级 b5444e7e
......@@ -18,7 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.core.data.QueueData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
/**
......@@ -28,7 +28,7 @@ import org.apache.skywalking.apm.collector.core.module.ModuleManager;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker<INPUT extends EndOfBatchQueueMessage, OUTPUT extends EndOfBatchQueueMessage> extends AbstractWorker<INPUT, OUTPUT> {
public abstract class AbstractLocalAsyncWorker<INPUT extends QueueData, OUTPUT extends QueueData> extends AbstractWorker<INPUT, OUTPUT> {
public AbstractLocalAsyncWorker(ModuleManager moduleManager) {
super(moduleManager);
......
......@@ -18,14 +18,14 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.core.data.QueueData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
/**
* @author peng-yongsheng
*/
public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends EndOfBatchQueueMessage, OUTPUT extends EndOfBatchQueueMessage, WORKER_TYPE extends AbstractLocalAsyncWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends QueueData, OUTPUT extends QueueData, WORKER_TYPE extends AbstractLocalAsyncWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
public abstract int queueSize();
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.RemoteData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.service.Selector;
......@@ -31,7 +31,7 @@ import org.apache.skywalking.apm.collector.remote.service.Selector;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorker<INPUT extends AbstractData, OUTPUT extends AbstractData> extends AbstractWorker<INPUT, OUTPUT> {
public abstract class AbstractRemoteWorker<INPUT extends RemoteData, OUTPUT extends RemoteData> extends AbstractWorker<INPUT, OUTPUT> {
public AbstractRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.RemoteData;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
......@@ -30,7 +30,7 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
* @author peng-yongsheng
* @since v3.0-2017
*/
public abstract class AbstractRemoteWorkerProvider<INPUT extends AbstractData, OUTPUT extends AbstractData, WORKER_TYPE extends AbstractRemoteWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
public abstract class AbstractRemoteWorkerProvider<INPUT extends RemoteData, OUTPUT extends RemoteData, WORKER_TYPE extends AbstractRemoteWorker<INPUT, OUTPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
private final RemoteSenderService remoteSenderService;
private final int graphId;
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.core.graph.Next;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -28,13 +27,13 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorker<INPUT extends EndOfBatchQueueMessage, OUTPUT extends EndOfBatchQueueMessage> implements NodeProcessor<INPUT, OUTPUT> {
public abstract class AbstractWorker<INPUT, OUTPUT> implements NodeProcessor<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final ModuleManager moduleManager;
public AbstractWorker(ModuleManager moduleManager) {
AbstractWorker(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
......
......@@ -18,17 +18,16 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
/**
* @author peng-yongsheng
*/
public abstract class AbstractWorkerProvider<INPUT extends EndOfBatchQueueMessage, OUTPUT extends EndOfBatchQueueMessage, WORKER_TYPE extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
public abstract class AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
private final ModuleManager moduleManager;
public AbstractWorkerProvider(ModuleManager moduleManager) {
AbstractWorkerProvider(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
}
......
......@@ -20,8 +20,9 @@ package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import java.util.Iterator;
import java.util.List;
import org.apache.skywalking.apm.collector.core.data.EndOfBatchQueueMessage;
import org.apache.skywalking.apm.collector.core.data.QueueData;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.queue.EndOfBatchContext;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.slf4j.Logger;
......@@ -30,7 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class LocalAsyncWorkerRef<INPUT extends EndOfBatchQueueMessage, OUTPUT extends EndOfBatchQueueMessage> extends WorkerRef<INPUT, OUTPUT> implements IConsumer<INPUT> {
public class LocalAsyncWorkerRef<INPUT extends QueueData, OUTPUT extends QueueData> extends WorkerRef<INPUT, OUTPUT> implements IConsumer<INPUT> {
private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerRef.class);
......@@ -40,7 +41,7 @@ public class LocalAsyncWorkerRef<INPUT extends EndOfBatchQueueMessage, OUTPUT ex
super(destinationHandler);
}
public void setQueueEventHandler(DataCarrier<INPUT> dataCarrier) {
void setQueueEventHandler(DataCarrier<INPUT> dataCarrier) {
this.dataCarrier = dataCarrier;
}
......@@ -52,7 +53,7 @@ public class LocalAsyncWorkerRef<INPUT extends EndOfBatchQueueMessage, OUTPUT ex
INPUT input = inputIterator.next();
i++;
if (i == data.size()) {
input.setEndOfBatch(true);
input.getEndOfBatchContext().setEndOfBatch(true);
}
out(input);
}
......@@ -69,6 +70,7 @@ public class LocalAsyncWorkerRef<INPUT extends EndOfBatchQueueMessage, OUTPUT ex
}
@Override protected void in(INPUT input) {
input.setEndOfBatchContext(new EndOfBatchContext(false));
dataCarrier.produce(input);
}
......
......@@ -18,7 +18,7 @@
package org.apache.skywalking.apm.collector.analysis.worker.model.base;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.RemoteData;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -26,7 +26,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class RemoteWorkerRef<INPUT extends AbstractData, OUTPUT extends AbstractData> extends WorkerRef<INPUT, OUTPUT> {
public class RemoteWorkerRef<INPUT extends RemoteData, OUTPUT extends RemoteData> extends WorkerRef<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerRef.class);
......
......@@ -33,7 +33,7 @@ public class WorkerCreateListener {
this.persistenceWorkers = new ArrayList<>();
}
public void addWorker(AbstractWorker worker) {
void addWorker(AbstractWorker worker) {
if (worker instanceof PersistenceWorker) {
persistenceWorkers.add((PersistenceWorker)worker);
}
......
......@@ -24,7 +24,7 @@ import org.apache.skywalking.apm.collector.core.graph.WayToNode;
/**
* @author peng-yongsheng
*/
public abstract class WorkerRef<INPUT, OUTPUT> extends WayToNode<INPUT, OUTPUT> {
abstract class WorkerRef<INPUT, OUTPUT> extends WayToNode<INPUT, OUTPUT> {
WorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
super(destinationHandler);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册