提交 f74a5a9c 编写于 作者: P peng-yongsheng

Parse and listener

上级 863933af
...@@ -36,4 +36,8 @@ public abstract class AbstractHashMessage { ...@@ -36,4 +36,8 @@ public abstract class AbstractHashMessage {
public int getHashCode() { public int getHashCode() {
return hashCode; return hashCode;
} }
public void setKey(String key) {
this.hashCode = key.hashCode();
}
} }
...@@ -129,6 +129,11 @@ public abstract class Data extends EndOfBatchQueueMessage { ...@@ -129,6 +129,11 @@ public abstract class Data extends EndOfBatchQueueMessage {
return dataStrings[0]; return dataStrings[0];
} }
public void setId(String id) {
setKey(id);
this.dataStrings[0] = id;
}
public void mergeData(Data newData) { public void mergeData(Data newData) {
for (int i = 0; i < stringColumns.length; i++) { for (int i = 0; i < stringColumns.length; i++) {
String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]); String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]);
......
...@@ -18,11 +18,9 @@ ...@@ -18,11 +18,9 @@
package org.skywalking.apm.collector.queue.base; package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.data.Data;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface QueueEventHandler<MESSAGE extends Data> { public interface QueueEventHandler<MESSAGE> {
void tell(MESSAGE message); void tell(MESSAGE message);
} }
...@@ -18,11 +18,10 @@ ...@@ -18,11 +18,10 @@
package org.skywalking.apm.collector.queue.base; package org.skywalking.apm.collector.queue.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.framework.Executor; import org.skywalking.apm.collector.core.framework.Executor;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface QueueExecutor<MESSAGE extends Data> extends Executor<MESSAGE> { public interface QueueExecutor<MESSAGE> extends Executor<MESSAGE> {
} }
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
package org.skywalking.apm.collector.queue.service; package org.skywalking.apm.collector.queue.service;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.module.Service; import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.base.QueueExecutor;
...@@ -26,6 +25,6 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor; ...@@ -26,6 +25,6 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface QueueCreatorService<MESSAGE extends Data> extends Service { public interface QueueCreatorService<MESSAGE> extends Service {
QueueEventHandler<MESSAGE> create(int queueSize, QueueExecutor<MESSAGE> executor); QueueEventHandler<MESSAGE> create(int queueSize, QueueExecutor<MESSAGE> executor);
} }
...@@ -52,11 +52,23 @@ public class GlobalTrace extends Data { ...@@ -52,11 +52,23 @@ public class GlobalTrace extends Data {
return getDataString(1); return getDataString(1);
} }
public void setSegmentId(String segmentId) {
setDataString(1, segmentId);
}
public String getGlobalTraceId() { public String getGlobalTraceId() {
return getDataString(2); return getDataString(2);
} }
public void setGlobalTraceId(String globalTraceId) {
setDataString(2, globalTraceId);
}
public Long getTimeBucket() { public Long getTimeBucket() {
return getDataLong(0); return getDataLong(0);
} }
public void setTimeBucket(long timeBucket) {
setDataLong(0, timeBucket);
}
} }
...@@ -50,4 +50,8 @@ public class Segment extends Data { ...@@ -50,4 +50,8 @@ public class Segment extends Data {
public byte[] getDataBinary() { public byte[] getDataBinary() {
return getDataBytes(0); return getDataBytes(0);
} }
public void setDataBinary(byte[] dataBinary) {
setDataBytes(0, dataBinary);
}
} }
...@@ -58,31 +58,63 @@ public class SegmentCost extends Data { ...@@ -58,31 +58,63 @@ public class SegmentCost extends Data {
return getDataString(1); return getDataString(1);
} }
public void setSegmentId(String segmentId) {
setDataString(1, segmentId);
}
public String getServiceName() { public String getServiceName() {
return getDataString(2); return getDataString(2);
} }
public void setServiceName(String serviceName) {
setDataString(2, serviceName);
}
public Long getCost() { public Long getCost() {
return getDataLong(0); return getDataLong(0);
} }
public void setCost(Long cost) {
setDataLong(0, cost);
}
public Long getStartTime() { public Long getStartTime() {
return getDataLong(1); return getDataLong(1);
} }
public void setStartTime(Long startTime) {
setDataLong(1, startTime);
}
public Long getEndTime() { public Long getEndTime() {
return getDataLong(2); return getDataLong(2);
} }
public void setEndTime(Long endTime) {
setDataLong(2, endTime);
}
public Long getTimeBucket() { public Long getTimeBucket() {
return getDataLong(3); return getDataLong(3);
} }
public void setTimeBucket(Long timeBucket) {
setDataLong(3, timeBucket);
}
public Integer getApplicationId() { public Integer getApplicationId() {
return getDataInteger(0); return getDataInteger(0);
} }
public void setApplicationId(Integer applicationId) {
setDataInteger(0, applicationId);
}
public Boolean getIsError() { public Boolean getIsError() {
return getDataBoolean(0); return getDataBoolean(0);
} }
public void setIsError(Boolean isError) {
setDataBoolean(0, isError);
}
} }
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager; import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
...@@ -30,7 +29,7 @@ import org.skywalking.apm.collector.storage.service.DAOService; ...@@ -30,7 +29,7 @@ import org.skywalking.apm.collector.storage.service.DAOService;
* @author peng-yongsheng * @author peng-yongsheng
* @since v3.0-2017 * @since v3.0-2017
*/ */
public abstract class AbstractLocalAsyncWorker<INPUT extends Data, OUTPUT extends Data> extends AbstractWorker<INPUT, OUTPUT> implements QueueExecutor<INPUT> { public abstract class AbstractLocalAsyncWorker<INPUT, OUTPUT> extends AbstractWorker<INPUT, OUTPUT> implements QueueExecutor<INPUT> {
public AbstractLocalAsyncWorker(DAOService daoService, CacheServiceManager cacheServiceManager) { public AbstractLocalAsyncWorker(DAOService daoService, CacheServiceManager cacheServiceManager) {
super(daoService, cacheServiceManager); super(daoService, cacheServiceManager);
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager; import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.queue.service.QueueCreatorService;
...@@ -28,7 +27,7 @@ import org.skywalking.apm.collector.storage.service.DAOService; ...@@ -28,7 +27,7 @@ import org.skywalking.apm.collector.storage.service.DAOService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPUT extends Data, WORKER_TYPE extends AbstractLocalAsyncWorker<INPUT, OUTPUT> & QueueExecutor<INPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> { public abstract class AbstractLocalAsyncWorkerProvider<INPUT, OUTPUT, WORKER_TYPE extends AbstractLocalAsyncWorker<INPUT, OUTPUT> & QueueExecutor<INPUT>> extends AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE> {
public abstract int queueSize(); public abstract int queueSize();
...@@ -41,12 +40,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPU ...@@ -41,12 +40,8 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends Data, OUTPU
} }
@Override @Override
final public WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException { public final WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException {
<<<<<<< HEAD WORKER_TYPE localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager());
WorkerType localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager());
=======
WORKER_TYPE localAsyncWorker = workerInstance(daoService);
>>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca
workerCreateListener.addWorker(localAsyncWorker); workerCreateListener.addWorker(localAsyncWorker);
QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker); QueueEventHandler<INPUT> queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker);
return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler); return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler);
......
...@@ -49,11 +49,7 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex ...@@ -49,11 +49,7 @@ public abstract class AbstractRemoteWorkerProvider<INPUT extends Data, OUTPUT ex
* worker instance, when the worker provider not find then Throw this Exception. * worker instance, when the worker provider not find then Throw this Exception.
*/ */
@Override final public WorkerRef create(WorkerCreateListener workerCreateListener) { @Override final public WorkerRef create(WorkerCreateListener workerCreateListener) {
<<<<<<< HEAD WORKER_TYPE remoteWorker = workerInstance(getDaoService(), getCacheServiceManager());
WorkerType remoteWorker = workerInstance(getDaoService(), getCacheServiceManager());
=======
WORKER_TYPE remoteWorker = workerInstance(daoService);
>>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca
workerCreateListener.addWorker(remoteWorker); workerCreateListener.addWorker(remoteWorker);
RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(remoteWorker); RemoteWorkerRef<INPUT, OUTPUT> workerRef = new RemoteWorkerRef<>(remoteWorker);
return workerRef; return workerRef;
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager; import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.Next; import org.skywalking.apm.collector.core.graph.Next;
import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
...@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory; ...@@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public abstract class AbstractWorker<INPUT extends Data, OUTPUT extends Data> implements NodeProcessor<INPUT, OUTPUT> { public abstract class AbstractWorker<INPUT, OUTPUT> implements NodeProcessor<INPUT, OUTPUT> {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class); private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
......
...@@ -19,14 +19,12 @@ ...@@ -19,14 +19,12 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.cache.CacheServiceManager; import org.skywalking.apm.collector.cache.CacheServiceManager;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.storage.service.DAOService;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
<<<<<<< HEAD public abstract class AbstractWorkerProvider<INPUT, OUTPUT, WORKER_TYPE extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends Data, WorkerType extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
private final DAOService daoService; private final DAOService daoService;
private final CacheServiceManager cacheServiceManager; private final CacheServiceManager cacheServiceManager;
...@@ -44,9 +42,5 @@ public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends ...@@ -44,9 +42,5 @@ public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends
return cacheServiceManager; return cacheServiceManager;
} }
public abstract WorkerType workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager); public abstract WORKER_TYPE workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager);
=======
public abstract class AbstractWorkerProvider<INPUT extends Data, OUTPUT extends Data, WORKER_TYPE extends AbstractWorker<INPUT, OUTPUT>> implements Provider {
public abstract WORKER_TYPE workerInstance(DAOService daoService);
>>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca
} }
...@@ -18,19 +18,17 @@ ...@@ -18,19 +18,17 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueEventHandler;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class LocalAsyncWorkerRef<INPUT extends Data, OUTPUT extends Data> extends WorkerRef<INPUT, OUTPUT> { public class LocalAsyncWorkerRef<INPUT, OUTPUT> extends WorkerRef<INPUT, OUTPUT> {
private final QueueEventHandler<INPUT> queueEventHandler; private final QueueEventHandler<INPUT> queueEventHandler;
LocalAsyncWorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler, LocalAsyncWorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler, QueueEventHandler<INPUT> queueEventHandler) {
QueueEventHandler<INPUT> queueEventHandler) {
super(destinationHandler); super(destinationHandler);
this.queueEventHandler = queueEventHandler; this.queueEventHandler = queueEventHandler;
} }
......
...@@ -18,14 +18,13 @@ ...@@ -18,14 +18,13 @@
package org.skywalking.apm.collector.stream.worker.base; package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.core.graph.NodeProcessor;
import org.skywalking.apm.collector.core.graph.WayToNode; import org.skywalking.apm.collector.core.graph.WayToNode;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public abstract class WorkerRef<INPUT extends Data, OUTPUT extends Data> extends WayToNode<INPUT, OUTPUT> { public abstract class WorkerRef<INPUT, OUTPUT> extends WayToNode<INPUT, OUTPUT> {
WorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler) { WorkerRef(NodeProcessor<INPUT, OUTPUT> destinationHandler) {
super(destinationHandler); super(destinationHandler);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册