From f74a5a9c27094ae5eb8ae6fb8e263922997ab7b4 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Sun, 12 Nov 2017 10:23:55 +0800 Subject: [PATCH] Parse and listener --- .../core/data/AbstractHashMessage.java | 4 +++ .../apm/collector/core/data/Data.java | 5 +++ .../queue/base/QueueEventHandler.java | 4 +-- .../collector/queue/base/QueueExecutor.java | 3 +- .../queue/service/QueueCreatorService.java | 3 +- .../storage/table/global/GlobalTrace.java | 12 +++++++ .../storage/table/segment/Segment.java | 4 +++ .../storage/table/segment/SegmentCost.java | 32 +++++++++++++++++++ .../worker/base/AbstractLocalAsyncWorker.java | 3 +- .../AbstractLocalAsyncWorkerProvider.java | 11 ++----- .../base/AbstractRemoteWorkerProvider.java | 6 +--- .../stream/worker/base/AbstractWorker.java | 3 +- .../worker/base/AbstractWorkerProvider.java | 10 ++---- .../worker/base/LocalAsyncWorkerRef.java | 6 ++-- .../stream/worker/base/WorkerRef.java | 3 +- 15 files changed, 71 insertions(+), 38 deletions(-) diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/AbstractHashMessage.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/AbstractHashMessage.java index 8e8978c28b..fbe4f7131f 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/AbstractHashMessage.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/AbstractHashMessage.java @@ -36,4 +36,8 @@ public abstract class AbstractHashMessage { public int getHashCode() { return hashCode; } + + public void setKey(String key) { + this.hashCode = key.hashCode(); + } } diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java index 86cd9bb888..6e5780fb15 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java @@ -129,6 +129,11 @@ public abstract class Data extends EndOfBatchQueueMessage { return dataStrings[0]; } + public void setId(String id) { + setKey(id); + this.dataStrings[0] = id; + } + public void mergeData(Data newData) { for (int i = 0; i < stringColumns.length; i++) { String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]); diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java index 51c3ef46e6..fa8f751fef 100644 --- a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueEventHandler.java @@ -18,11 +18,9 @@ package org.skywalking.apm.collector.queue.base; -import org.skywalking.apm.collector.core.data.Data; - /** * @author peng-yongsheng */ -public interface QueueEventHandler { +public interface QueueEventHandler { void tell(MESSAGE message); } diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java index c02075f7c8..b6138e5696 100644 --- a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java @@ -18,11 +18,10 @@ package org.skywalking.apm.collector.queue.base; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.framework.Executor; /** * @author peng-yongsheng */ -public interface QueueExecutor extends Executor { +public interface QueueExecutor extends Executor { } diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java index 0c59b8df9f..74621cb5c5 100644 --- a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/service/QueueCreatorService.java @@ -18,7 +18,6 @@ package org.skywalking.apm.collector.queue.service; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.module.Service; import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueExecutor; @@ -26,6 +25,6 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor; /** * @author peng-yongsheng */ -public interface QueueCreatorService extends Service { +public interface QueueCreatorService extends Service { QueueEventHandler create(int queueSize, QueueExecutor executor); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/global/GlobalTrace.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/global/GlobalTrace.java index 618ae8475f..bc90044184 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/global/GlobalTrace.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/global/GlobalTrace.java @@ -52,11 +52,23 @@ public class GlobalTrace extends Data { return getDataString(1); } + public void setSegmentId(String segmentId) { + setDataString(1, segmentId); + } + public String getGlobalTraceId() { return getDataString(2); } + public void setGlobalTraceId(String globalTraceId) { + setDataString(2, globalTraceId); + } + public Long getTimeBucket() { return getDataLong(0); } + + public void setTimeBucket(long timeBucket) { + setDataLong(0, timeBucket); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/Segment.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/Segment.java index e52127715e..01a959ae7a 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/Segment.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/Segment.java @@ -50,4 +50,8 @@ public class Segment extends Data { public byte[] getDataBinary() { return getDataBytes(0); } + + public void setDataBinary(byte[] dataBinary) { + setDataBytes(0, dataBinary); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/SegmentCost.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/SegmentCost.java index e5b6eab3c9..25c7d60447 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/SegmentCost.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/segment/SegmentCost.java @@ -58,31 +58,63 @@ public class SegmentCost extends Data { return getDataString(1); } + public void setSegmentId(String segmentId) { + setDataString(1, segmentId); + } + public String getServiceName() { return getDataString(2); } + public void setServiceName(String serviceName) { + setDataString(2, serviceName); + } + public Long getCost() { return getDataLong(0); } + public void setCost(Long cost) { + setDataLong(0, cost); + } + public Long getStartTime() { return getDataLong(1); } + public void setStartTime(Long startTime) { + setDataLong(1, startTime); + } + public Long getEndTime() { return getDataLong(2); } + public void setEndTime(Long endTime) { + setDataLong(2, endTime); + } + public Long getTimeBucket() { return getDataLong(3); } + public void setTimeBucket(Long timeBucket) { + setDataLong(3, timeBucket); + } + public Integer getApplicationId() { return getDataInteger(0); } + public void setApplicationId(Integer applicationId) { + setDataInteger(0, applicationId); + } + public Boolean getIsError() { return getDataBoolean(0); } + + public void setIsError(Boolean isError) { + setDataBoolean(0, isError); + } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java index dcafca158e..590eebe783 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java @@ -19,7 +19,6 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.cache.CacheServiceManager; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.storage.service.DAOService; @@ -30,7 +29,7 @@ import org.skywalking.apm.collector.storage.service.DAOService; * @author peng-yongsheng * @since v3.0-2017 */ -public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor { +public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor { public AbstractLocalAsyncWorker(DAOService daoService, CacheServiceManager cacheServiceManager) { super(daoService, cacheServiceManager); diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java index aade6648d4..17823c6eb4 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java @@ -19,7 +19,6 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.cache.CacheServiceManager; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.service.QueueCreatorService; @@ -28,7 +27,7 @@ import org.skywalking.apm.collector.storage.service.DAOService; /** * @author peng-yongsheng */ -public abstract class AbstractLocalAsyncWorkerProvider & QueueExecutor> extends AbstractWorkerProvider { +public abstract class AbstractLocalAsyncWorkerProvider & QueueExecutor> extends AbstractWorkerProvider { public abstract int queueSize(); @@ -41,12 +40,8 @@ public abstract class AbstractLocalAsyncWorkerProvider>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca + public final WorkerRef create(WorkerCreateListener workerCreateListener) throws ProviderNotFoundException { + WORKER_TYPE localAsyncWorker = workerInstance(getDaoService(), getCacheServiceManager()); workerCreateListener.addWorker(localAsyncWorker); QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker); return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler); diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java index 7810b9b7b5..d316edc94d 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractRemoteWorkerProvider.java @@ -49,11 +49,7 @@ public abstract class AbstractRemoteWorkerProvider>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca + WORKER_TYPE remoteWorker = workerInstance(getDaoService(), getCacheServiceManager()); workerCreateListener.addWorker(remoteWorker); RemoteWorkerRef workerRef = new RemoteWorkerRef<>(remoteWorker); return workerRef; diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java index 8367069ec3..328c5b09da 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorker.java @@ -19,7 +19,6 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.cache.CacheServiceManager; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.graph.Next; import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.storage.service.DAOService; @@ -29,7 +28,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public abstract class AbstractWorker implements NodeProcessor { +public abstract class AbstractWorker implements NodeProcessor { private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class); diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java index 378b886f50..29afdb0ce7 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java @@ -19,14 +19,12 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.cache.CacheServiceManager; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.storage.service.DAOService; /** * @author peng-yongsheng */ -<<<<<<< HEAD -public abstract class AbstractWorkerProvider> implements Provider { +public abstract class AbstractWorkerProvider> implements Provider { private final DAOService daoService; private final CacheServiceManager cacheServiceManager; @@ -44,9 +42,5 @@ public abstract class AbstractWorkerProvider> implements Provider { - public abstract WORKER_TYPE workerInstance(DAOService daoService); ->>>>>>> 0c17906c3c1c41752e1ec38b37d9e0dec22503ca + public abstract WORKER_TYPE workerInstance(DAOService daoService, CacheServiceManager cacheServiceManager); } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java index b5a2b51ed5..7639c8bb5b 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java @@ -18,19 +18,17 @@ package org.skywalking.apm.collector.stream.worker.base; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.queue.base.QueueEventHandler; /** * @author peng-yongsheng */ -public class LocalAsyncWorkerRef extends WorkerRef { +public class LocalAsyncWorkerRef extends WorkerRef { private final QueueEventHandler queueEventHandler; - LocalAsyncWorkerRef(NodeProcessor destinationHandler, - QueueEventHandler queueEventHandler) { + LocalAsyncWorkerRef(NodeProcessor destinationHandler, QueueEventHandler queueEventHandler) { super(destinationHandler); this.queueEventHandler = queueEventHandler; } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java index c7704adea6..0766af55e7 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java @@ -18,14 +18,13 @@ package org.skywalking.apm.collector.stream.worker.base; -import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.core.graph.WayToNode; /** * @author peng-yongsheng */ -public abstract class WorkerRef extends WayToNode { +public abstract class WorkerRef extends WayToNode { WorkerRef(NodeProcessor destinationHandler) { super(destinationHandler); } -- GitLab