From 8e8029d008d0ce85d01fbe3bfa92f7f6a1da332b Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Wed, 8 Nov 2017 21:32:19 +0800 Subject: [PATCH] no message --- .../handler/JVMMetricsServiceHandler.java | 133 ++++++++---------- .../collector-agent-stream/pom.xml | 2 +- .../stream/graph/JvmMetricStreamGraph.java | 94 +++++++++++++ .../jvm/CpuMetricPersistenceWorker.java | 64 +++++++++ .../jvm/GCMetricPersistenceWorker.java} | 18 +-- .../jvm/InstHeartBeatPersistenceWorker.java} | 20 ++- .../jvm/MemoryMetricPersistenceWorker.java} | 16 ++- .../MemoryPoolMetricPersistenceWorker.java | 37 +++++ .../apm/collector/core/data/Data.java | 39 +++++ .../collector/queue/base/QueueExecutor.java | 5 +- .../collector/storage/base/dao/IBatchDAO.java | 2 +- .../storage/base/dao/IPersistenceDAO.java | 10 +- .../storage/dao/ICpuMetricStreamDAO.java | 4 +- .../collector/storage/service/DAOService.java | 3 + .../storage/table/jvm/CpuMetric.java | 12 ++ .../collector/storage/table/jvm/GCMetric.java | 20 +++ .../storage/table/jvm/MemoryMetric.java | 28 ++++ .../storage/table/jvm/MemoryPoolMetric.java | 28 ++++ .../storage/es/dao/CpuMetricEsStreamDAO.java | 3 +- .../worker/base/AbstractLocalAsyncWorker.java | 33 +---- .../AbstractLocalAsyncWorkerProvider.java | 16 +-- .../stream/worker/base/AbstractWorker.java | 30 ++-- .../worker/base/AbstractWorkerProvider.java | 17 +-- .../worker/base/LocalAsyncWorkerRef.java | 3 +- .../stream/worker/base/WorkerContext.java | 91 ------------ .../stream/worker/base/WorkerRef.java | 10 -- .../stream/worker/impl/PersistenceWorker.java | 32 ++--- 27 files changed, 459 insertions(+), 311 deletions(-) create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java rename apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/{graph/GCMetricStreamGraph.java => worker/jvm/GCMetricPersistenceWorker.java} (67%) rename apm-collector/{apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/ClusterWorkerContext.java => apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java} (62%) rename apm-collector/{apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java => apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java} (60%) create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java delete mode 100644 apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerContext.java diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java index 927c18ec60..d9490f144c 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java @@ -20,24 +20,17 @@ package org.skywalking.apm.collector.agent.grpc.handler; import io.grpc.stub.StreamObserver; import java.util.List; -import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker; -import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker; -import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker; -import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; -import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker; -import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker; -import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; import org.skywalking.apm.collector.core.util.Const; import org.skywalking.apm.collector.core.util.TimeBucketUtils; import org.skywalking.apm.collector.server.grpc.GRPCHandler; -import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine; -import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine; -import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine; -import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine; -import org.skywalking.apm.collector.stream.StreamModuleContext; -import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; -import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; -import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; +import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; +import org.skywalking.apm.collector.storage.table.jvm.GCMetric; +import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; +import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; +import org.skywalking.apm.collector.storage.table.register.Instance; import org.skywalking.apm.network.proto.CPU; import org.skywalking.apm.network.proto.Downstream; import org.skywalking.apm.network.proto.GC; @@ -55,81 +48,77 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class); + private final Graph memoryMetricGraph; + private final Graph memoryPoolMetricGraph; + private final Graph gcMetricGraph; + private final Graph cpuMetricGraph; + private final Graph heartBeatGraph; + + public JVMMetricsServiceHandler() { + memoryPoolMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.MEMORY_POOL_METRIC_GRAPH_ID); + memoryMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.MEMORY_METRIC_GRAPH_ID); + gcMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID); + cpuMetricGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID); + heartBeatGraph = GraphManager.INSTANCE.findGraph(JvmMetricStreamGraph.INST_HEART_BEAT_GRAPH_ID); + } + @Override public void collect(JVMMetrics request, StreamObserver responseObserver) { int instanceId = request.getApplicationInstanceId(); logger.debug("receive the jvm metric from application instance, id: {}", instanceId); - StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); request.getMetricsList().forEach(metric -> { long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); - senToInstanceHeartBeatPersistenceWorker(context, instanceId, metric.getTime()); - sendToCpuMetricPersistenceWorker(context, instanceId, time, metric.getCpu()); - sendToMemoryMetricPersistenceWorker(context, instanceId, time, metric.getMemoryList()); - sendToMemoryPoolMetricPersistenceWorker(context, instanceId, time, metric.getMemoryPoolList()); - sendToGCMetricPersistenceWorker(context, instanceId, time, metric.getGcList()); + senToInstanceHeartBeatPersistenceWorker(instanceId, metric.getTime()); + sendToCpuMetricPersistenceWorker(instanceId, time, metric.getCpu()); + sendToMemoryMetricPersistenceWorker(instanceId, time, metric.getMemoryList()); + sendToMemoryPoolMetricPersistenceWorker(instanceId, time, metric.getMemoryPoolList()); + sendToGCMetricPersistenceWorker(instanceId, time, metric.getGcList()); }); responseObserver.onNext(Downstream.newBuilder().build()); responseObserver.onCompleted(); } - private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int instanceId, - long heartBeatTime) { - InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat(); - heartBeat.setId(String.valueOf(instanceId)); - heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); - heartBeat.setInstanceId(instanceId); - try { - logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId()); - context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData()); - } catch (WorkerInvokeException | WorkerNotFoundException e) { - logger.error(e.getMessage(), e); - } + private void senToInstanceHeartBeatPersistenceWorker(int instanceId, long heartBeatTime) { + Instance instance = new Instance(String.valueOf(instanceId)); + instance.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); + instance.setInstanceId(instanceId); + + logger.debug("send to instance heart beat persistence worker, id: {}", instance.getId()); + heartBeatGraph.start(instance); } - private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int instanceId, - long timeBucket, CPU cpu) { - CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric(); - cpuMetric.setId(timeBucket + Const.ID_SPLIT + instanceId); + private void sendToCpuMetricPersistenceWorker(int instanceId, long timeBucket, CPU cpu) { + CpuMetric cpuMetric = new CpuMetric(timeBucket + Const.ID_SPLIT + instanceId); cpuMetric.setInstanceId(instanceId); cpuMetric.setUsagePercent(cpu.getUsagePercent()); cpuMetric.setTimeBucket(timeBucket); - try { - logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId()); - context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData()); - } catch (WorkerInvokeException | WorkerNotFoundException e) { - logger.error(e.getMessage(), e); - } - } - private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int instanceId, - long timeBucket, List memories) { + logger.debug("send to cpu metric graph, id: {}", cpuMetric.getId()); + cpuMetricGraph.start(cpuMetric); + } + private void sendToMemoryMetricPersistenceWorker(int instanceId, long timeBucket, List memories) { memories.forEach(memory -> { - MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric(); - memoryMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap())); - memoryMetric.setApplicationInstanceId(instanceId); - memoryMetric.setHeap(memory.getIsHeap()); + MemoryMetric memoryMetric = new MemoryMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap())); + memoryMetric.setInstanceId(instanceId); + memoryMetric.setIsHeap(memory.getIsHeap()); memoryMetric.setInit(memory.getInit()); memoryMetric.setMax(memory.getMax()); memoryMetric.setUsed(memory.getUsed()); memoryMetric.setCommitted(memory.getCommitted()); memoryMetric.setTimeBucket(timeBucket); - try { - logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId()); - context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData()); - } catch (WorkerInvokeException | WorkerNotFoundException e) { - logger.error(e.getMessage(), e); - } + + logger.debug("send to memory metric graph, id: {}", memoryMetric.getId()); + memoryMetricGraph.start(memoryMetric); }); } - private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int instanceId, - long timeBucket, List memoryPools) { + private void sendToMemoryPoolMetricPersistenceWorker(int instanceId, long timeBucket, + List memoryPools) { memoryPools.forEach(memoryPool -> { - MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric(); - memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber())); + MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber())); memoryPoolMetric.setInstanceId(instanceId); memoryPoolMetric.setPoolType(memoryPool.getType().getNumber()); memoryPoolMetric.setInit(memoryPool.getInit()); @@ -137,31 +126,23 @@ public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsSe memoryPoolMetric.setUsed(memoryPool.getUsed()); memoryPoolMetric.setCommitted(memoryPool.getCommited()); memoryPoolMetric.setTimeBucket(timeBucket); - try { - logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId()); - context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData()); - } catch (WorkerInvokeException | WorkerNotFoundException e) { - logger.error(e.getMessage(), e); - } + + logger.debug("send to memory pool metric graph, id: {}", memoryPoolMetric.getId()); + memoryPoolMetricGraph.start(memoryPoolMetric); }); } - private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int instanceId, - long timeBucket, List gcs) { + private void sendToGCMetricPersistenceWorker(int instanceId, long timeBucket, List gcs) { gcs.forEach(gc -> { - GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric(); - gcMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue())); + GCMetric gcMetric = new GCMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue())); gcMetric.setInstanceId(instanceId); gcMetric.setPhrase(gc.getPhraseValue()); gcMetric.setCount(gc.getCount()); gcMetric.setTime(gc.getTime()); gcMetric.setTimeBucket(timeBucket); - try { - logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId()); - context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData()); - } catch (WorkerInvokeException | WorkerNotFoundException e) { - logger.error(e.getMessage(), e); - } + + logger.debug("send to gc metric graph, id: {}", gcMetric.getId()); + gcMetricGraph.start(gcMetric); }); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml index c0228e283f..5c1285c3ff 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml +++ b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml @@ -33,7 +33,7 @@ org.skywalking - collector-stream-define + collector-stream-provider ${project.version} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java new file mode 100644 index 0000000000..de5ab76fa6 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java @@ -0,0 +1,94 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.graph; + +import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.InstHeartBeatPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricPersistenceWorker; +import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricPersistenceWorker; +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.graph.Next; +import org.skywalking.apm.collector.core.graph.NodeProcessor; +import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; +import org.skywalking.apm.collector.storage.table.jvm.GCMetric; +import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; +import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; +import org.skywalking.apm.collector.storage.table.register.Instance; +import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException; +import org.skywalking.apm.collector.stream.worker.base.WorkerInvokeException; +import org.skywalking.apm.collector.stream.worker.base.WorkerRef; + +/** + * @author peng-yongsheng + */ +public class JvmMetricStreamGraph { + + public static final int GC_METRIC_GRAPH_ID = 100; + public static final int MEMORY_METRIC_GRAPH_ID = 101; + public static final int MEMORY_POOL_METRIC_GRAPH_ID = 102; + public static final int CPU_METRIC_GRAPH_ID = 103; + public static final int INST_HEART_BEAT_GRAPH_ID = 104; + + public Graph createGCMetricGraph() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); + graph.addNode(new GCMetricPersistenceWorker()); + return graph; + } + + public Graph createCPUMetricGraph() throws ProviderNotFoundException { + CpuMetricPersistenceWorker.Factory factory = new CpuMetricPersistenceWorker.Factory(null, null); + final WorkerRef workerRef = factory.create(null); + + Graph graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); + graph.addNode(new NodeProcessor() { + @Override public int id() { + return 0; + } + + @Override public void process(CpuMetric INPUT, Next next) { + try { + workerRef.tell(INPUT); + } catch (WorkerInvokeException e) { + e.printStackTrace(); + } + } + }); + return graph; + } + + public Graph createMemoryMetricGraph() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); + graph.addNode(new MemoryMetricPersistenceWorker()); + return graph; + } + + public Graph createMemoryPoolMetricGraph() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); + graph.addNode(new MemoryPoolMetricPersistenceWorker()); + return graph; + } + + public Graph createHeartBeatGraph() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); + graph.addNode(new InstHeartBeatPersistenceWorker()); + return graph; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java new file mode 100644 index 0000000000..788f398f9e --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/CpuMetricPersistenceWorker.java @@ -0,0 +1,64 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.jvm; + +import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO; +import org.skywalking.apm.collector.storage.service.DAOService; +import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; +import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker; + +/** + * @author peng-yongsheng + */ +public class CpuMetricPersistenceWorker extends PersistenceWorker { + + private final DAOService daoService; + + public CpuMetricPersistenceWorker(DAOService daoService) { + super(daoService); + this.daoService = daoService; + } + + @Override protected boolean needMergeDBData() { + return false; + } + + @Override protected IPersistenceDAO persistenceDAO() { + return daoService.getPersistenceDAO(ICpuMetricStreamDAO.class); + } + + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(DAOService daoService, QueueCreatorService queueCreatorService) { + super(daoService, queueCreatorService); + } + + @Override + public CpuMetricPersistenceWorker workerInstance(DAOService daoService) { + return new CpuMetricPersistenceWorker(daoService); + } + + @Override + public int queueSize() { + return 1024; + } + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java similarity index 67% rename from apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java rename to apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java index f30a0acc25..ff44b352e7 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/GCMetricPersistenceWorker.java @@ -16,20 +16,22 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.agent.stream.graph; +package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.core.graph.Graph; -import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.core.graph.Next; +import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.storage.table.jvm.GCMetric; /** * @author peng-yongsheng */ -public class GCMetricStreamGraph { +public class GCMetricPersistenceWorker implements NodeProcessor { + + @Override public int id() { + return 1; + } + + @Override public void process(GCMetric INPUT, Next next) { - public Graph createIfAbsent() { - Graph graph = GraphManager.INSTANCE.createIfAbsent(1, GCMetric.class); - graph.addNode(); - return graph; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/ClusterWorkerContext.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java similarity index 62% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/ClusterWorkerContext.java rename to apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java index 33cee82593..133531e4fa 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/ClusterWorkerContext.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/InstHeartBeatPersistenceWorker.java @@ -16,24 +16,22 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base; +package org.skywalking.apm.collector.agent.stream.worker.jvm; -import java.util.ArrayList; -import java.util.List; +import org.skywalking.apm.collector.core.graph.Next; +import org.skywalking.apm.collector.core.graph.NodeProcessor; +import org.skywalking.apm.collector.storage.table.register.Instance; /** * @author peng-yongsheng */ -public class ClusterWorkerContext extends WorkerContext { +public class InstHeartBeatPersistenceWorker implements NodeProcessor { - private List providers = new ArrayList<>(); - - public List getProviders() { - return providers; + @Override public int id() { + return 0; } - @Override - public void putProvider(AbstractRemoteWorkerProvider provider) { - providers.add(provider); + @Override public void process(Instance INPUT, Next next) { + } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java similarity index 60% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java rename to apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java index 59c7e4239e..d97efa560a 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryMetricPersistenceWorker.java @@ -16,16 +16,22 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base; +package org.skywalking.apm.collector.agent.stream.worker.jvm; -import org.skywalking.apm.collector.remote.RoutingRule; +import org.skywalking.apm.collector.core.graph.Next; +import org.skywalking.apm.collector.core.graph.NodeProcessor; +import org.skywalking.apm.collector.storage.table.jvm.MemoryMetric; /** * @author peng-yongsheng */ -public interface Role { +public class MemoryMetricPersistenceWorker implements NodeProcessor { - String roleName(); + @Override public int id() { + return 0; + } - RoutingRule routingRule(); + @Override public void process(MemoryMetric INPUT, Next next) { + + } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java new file mode 100644 index 0000000000..8546696cf7 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/jvm/MemoryPoolMetricPersistenceWorker.java @@ -0,0 +1,37 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.worker.jvm; + +import org.skywalking.apm.collector.core.graph.Next; +import org.skywalking.apm.collector.core.graph.NodeProcessor; +import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetric; + +/** + * @author peng-yongsheng + */ +public class MemoryPoolMetricPersistenceWorker implements NodeProcessor { + + @Override public int id() { + return 0; + } + + @Override public void process(MemoryPoolMetric INPUT, Next next) { + + } +} diff --git a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java index d30523b7a4..29f569dfca 100644 --- a/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java +++ b/apm-collector/apm-collector-core/src/main/java/org/skywalking/apm/collector/core/data/Data.java @@ -28,6 +28,12 @@ public abstract class Data extends AbstractHashMessage { private Integer[] dataIntegers; private Boolean[] dataBooleans; private byte[][] dataBytes; + private final Column[] stringColumns; + private final Column[] longColumns; + private final Column[] doubleColumns; + private final Column[] integerColumns; + private final Column[] booleanColumns; + private final Column[] byteColumns; public Data(String id, Column[] stringColumns, Column[] longColumns, Column[] doubleColumns, Column[] integerColumns, Column[] booleanColumns, Column[] byteColumns) { @@ -39,6 +45,12 @@ public abstract class Data extends AbstractHashMessage { this.dataIntegers = new Integer[integerColumns.length]; this.dataBooleans = new Boolean[booleanColumns.length]; this.dataBytes = new byte[byteColumns.length][]; + this.stringColumns = stringColumns; + this.longColumns = longColumns; + this.doubleColumns = doubleColumns; + this.integerColumns = integerColumns; + this.booleanColumns = booleanColumns; + this.byteColumns = byteColumns; } public int getDataStringsCount() { @@ -117,6 +129,33 @@ public abstract class Data extends AbstractHashMessage { return dataStrings[0]; } + public void mergeData(Data newData) { + for (int i = 0; i < stringColumns.length; i++) { + String stringData = stringColumns[i].getOperation().operate(newData.getDataString(i), this.dataStrings[i]); + this.dataStrings[i] = stringData; + } + for (int i = 0; i < longColumns.length; i++) { + Long longData = longColumns[i].getOperation().operate(newData.getDataLong(i), this.dataLongs[i]); + this.dataLongs[i] = longData; + } + for (int i = 0; i < doubleColumns.length; i++) { + Double doubleData = doubleColumns[i].getOperation().operate(newData.getDataDouble(i), this.dataDoubles[i]); + this.dataDoubles[i] = doubleData; + } + for (int i = 0; i < integerColumns.length; i++) { + Integer integerData = integerColumns[i].getOperation().operate(newData.getDataInteger(i), this.dataIntegers[i]); + this.dataIntegers[i] = integerData; + } + for (int i = 0; i < booleanColumns.length; i++) { + Boolean booleanData = booleanColumns[i].getOperation().operate(newData.getDataBoolean(i), this.dataBooleans[i]); + this.dataBooleans[i] = booleanData; + } + for (int i = 0; i < byteColumns.length; i++) { + byte[] byteData = byteColumns[i].getOperation().operate(newData.getDataBytes(i), this.dataBytes[i]); + this.dataBytes[i] = byteData; + } + } + @Override public String toString() { StringBuilder dataStr = new StringBuilder(); dataStr.append("string: ["); diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java index eabf2c415a..9ff1134af2 100644 --- a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/QueueExecutor.java @@ -18,9 +18,10 @@ package org.skywalking.apm.collector.queue.base; +import org.skywalking.apm.collector.core.framework.Executor; + /** * @author peng-yongsheng */ -public interface QueueExecutor { - void execute(Object message); +public interface QueueExecutor extends Executor { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IBatchDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IBatchDAO.java index 10c92b195e..435420dfc4 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IBatchDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IBatchDAO.java @@ -23,6 +23,6 @@ import java.util.List; /** * @author peng-yongsheng */ -public interface IBatchDAO { +public interface IBatchDAO extends DAO { void batchPersistence(List batchCollection); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java index 6ec2d44964..7fef53189b 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/base/dao/IPersistenceDAO.java @@ -18,13 +18,15 @@ package org.skywalking.apm.collector.storage.base.dao; +import org.skywalking.apm.collector.core.data.Data; + /** * @author peng-yongsheng */ -public interface IPersistenceDAO { - Data get(String id); +public interface IPersistenceDAO { + DataImpl get(String id); - Insert prepareBatchInsert(Data data); + Insert prepareBatchInsert(DataImpl data); - Update prepareBatchUpdate(Data data); + Update prepareBatchUpdate(DataImpl data); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/ICpuMetricStreamDAO.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/ICpuMetricStreamDAO.java index e5bb15e093..cca21be352 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/ICpuMetricStreamDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/dao/ICpuMetricStreamDAO.java @@ -18,8 +18,10 @@ package org.skywalking.apm.collector.storage.dao; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; + /** * @author peng-yongsheng */ -public interface ICpuMetricStreamDAO { +public interface ICpuMetricStreamDAO extends IPersistenceDAO { } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java index fe29f579cf..2c7105e150 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/service/DAOService.java @@ -20,10 +20,13 @@ package org.skywalking.apm.collector.storage.service; import org.skywalking.apm.collector.core.module.Service; import org.skywalking.apm.collector.storage.base.dao.DAO; +import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; /** * @author peng-yongsheng */ public interface DAOService extends Service { DAO get(Class daoInterfaceClass); + + IPersistenceDAO getPersistenceDAO(Class daoInterfaceClass); } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/CpuMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/CpuMetric.java index 17e4a98077..7abcf5a766 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/CpuMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/CpuMetric.java @@ -56,11 +56,23 @@ public class CpuMetric extends Data { return getDataInteger(0); } + public void setInstanceId(Integer instanceId) { + setDataInteger(0, instanceId); + } + public Double getUsagePercent() { return getDataDouble(0); } + public void setUsagePercent(Double usagePercent) { + setDataDouble(0, usagePercent); + } + public Long getTimeBucket() { return getDataLong(0); } + + public void setTimeBucket(Long timeBucket) { + setDataLong(0, timeBucket); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/GCMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/GCMetric.java index e7eebb60b6..3744ee0a7c 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/GCMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/GCMetric.java @@ -57,19 +57,39 @@ public class GCMetric extends Data { return getDataLong(0); } + public void setCount(Long count) { + setDataLong(0, count); + } + public Long getTime() { return getDataLong(1); } + public void setTime(Long time) { + setDataLong(1, time); + } + public Long getTimeBucket() { return getDataLong(2); } + public void setTimeBucket(Long timeBucket) { + setDataLong(2, timeBucket); + } + public Integer getInstanceId() { return getDataInteger(0); } + public void setInstanceId(Integer instanceId) { + setDataInteger(0, instanceId); + } + public Integer getPhrase() { return getDataInteger(1); } + + public void setPhrase(Integer phrase) { + setDataInteger(1, phrase); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java index ce93ab765f..63579f27f4 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryMetric.java @@ -60,27 +60,55 @@ public class MemoryMetric extends Data { return getDataLong(0); } + public void setInit(Long init) { + setDataLong(0, init); + } + public Long getMax() { return getDataLong(1); } + public void setMax(Long max) { + setDataLong(1, max); + } + public Long getUsed() { return getDataLong(2); } + public void setUsed(Long used) { + setDataLong(2, used); + } + public Long getCommitted() { return getDataLong(3); } + public void setCommitted(Long committed) { + setDataLong(3, committed); + } + public Long getTimeBucket() { return getDataLong(4); } + public void setTimeBucket(Long timeBucket) { + setDataLong(4, timeBucket); + } + public Boolean getIsHeap() { return getDataBoolean(0); } + public void setIsHeap(Boolean isHeap) { + setDataBoolean(0, isHeap); + } + public Integer getInstanceId() { return getDataInteger(0); } + + public void setInstanceId(Integer instanceId) { + setDataInteger(0, instanceId); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java index 6694993d49..77cca51802 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/jvm/MemoryPoolMetric.java @@ -59,27 +59,55 @@ public class MemoryPoolMetric extends Data { return getDataLong(0); } + public void setInit(Long init) { + setDataLong(0, init); + } + public Long getMax() { return getDataLong(1); } + public void setMax(Long max) { + setDataLong(1, max); + } + public Long getUsed() { return getDataLong(2); } + public void setUsed(Long used) { + setDataLong(2, used); + } + public Long getCommitted() { return getDataLong(3); } + public void setCommitted(Long committed) { + setDataLong(3, committed); + } + public Long getTimeBucket() { return getDataLong(4); } + public void setTimeBucket(Long timeBucket) { + setDataLong(4, timeBucket); + } + public Integer getInstanceId() { return getDataInteger(0); } + public void setInstanceId(Integer instanceId) { + setDataInteger(0, instanceId); + } + public Integer getPoolType() { return getDataInteger(1); } + + public void setPoolType(Integer poolType) { + setDataInteger(1, poolType); + } } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/CpuMetricEsStreamDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/CpuMetricEsStreamDAO.java index 10182b7199..3a0d9e4dce 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/CpuMetricEsStreamDAO.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/CpuMetricEsStreamDAO.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.Map; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; import org.skywalking.apm.collector.storage.dao.ICpuMetricStreamDAO; import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; import org.skywalking.apm.collector.storage.table.jvm.CpuMetric; @@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class CpuMetricEsStreamDAO extends EsDAO implements ICpuMetricStreamDAO, IPersistenceDAO { +public class CpuMetricEsStreamDAO extends EsDAO implements ICpuMetricStreamDAO { private final Logger logger = LoggerFactory.getLogger(CpuMetricEsStreamDAO.class); diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java index f08747498a..6b57ad558c 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java @@ -27,38 +27,7 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor; * @author peng-yongsheng * @since v3.0-2017 */ -public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor { - - private LocalAsyncWorkerRef workerRef; - - /** - * Construct an AbstractLocalAsyncWorker with the worker role and context. - * - * @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use - * to provide load balancing ability. - * @param clusterContext See {@link ClusterWorkerContext} - */ - public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - } - - /** - * The asynchronous worker always use to persistence data into db, this is the end of the streaming, - * so usually no need to create the next worker instance at the time of this worker instance create. - * - * @throws ProviderNotFoundException When worker provider not found, it will be throw this exception. - */ - @Override - public void preStart() throws ProviderNotFoundException { - } - - @Override protected final LocalAsyncWorkerRef getSelf() { - return workerRef; - } - - @Override protected final void putSelfRef(LocalAsyncWorkerRef workerRef) { - this.workerRef = workerRef; - } +public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor { /** * Receive message diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java index 7192cce36d..324cba6f6f 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java @@ -21,6 +21,7 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.service.QueueCreatorService; +import org.skywalking.apm.collector.storage.service.DAOService; /** * @author peng-yongsheng @@ -29,24 +30,19 @@ public abstract class AbstractLocalAsyncWorkerProvider { +public abstract class AbstractWorker implements Executor { private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class); - private final Role role; - - private final ClusterWorkerContext clusterContext; - - public AbstractWorker(Role role, ClusterWorkerContext clusterContext) { - this.role = role; - this.clusterContext = clusterContext; - } - /** * The data process logic in this method. * @@ -45,17 +37,11 @@ public abstract class AbstractWorker { */ protected abstract void onWork(Object message) throws WorkerException; - public abstract void preStart() throws ProviderNotFoundException; - - final public ClusterWorkerContext getClusterContext() { - return clusterContext; + @Override public final void execute(Object message) { + try { + onWork(message); + } catch (WorkerException e) { + logger.error(e.getMessage(), e); + } } - - final public Role getRole() { - return role; - } - - protected abstract S getSelf(); - - protected abstract void putSelfRef(S workerRef); } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java index f73a305323..46023c159f 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractWorkerProvider.java @@ -18,22 +18,11 @@ package org.skywalking.apm.collector.stream.worker.base; +import org.skywalking.apm.collector.storage.service.DAOService; + /** * @author peng-yongsheng */ public abstract class AbstractWorkerProvider implements Provider { - - private ClusterWorkerContext clusterContext; - - public abstract Role role(); - - public abstract T workerInstance(ClusterWorkerContext clusterContext); - - final public void setClusterContext(ClusterWorkerContext clusterContext) { - this.clusterContext = clusterContext; - } - - final protected ClusterWorkerContext getClusterContext() { - return clusterContext; - } + public abstract T workerInstance(DAOService daoService); } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java index 380d68814f..61d51ba1c4 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java @@ -27,8 +27,7 @@ public class LocalAsyncWorkerRef extends WorkerRef { private QueueEventHandler queueEventHandler; - public LocalAsyncWorkerRef(Role role, QueueEventHandler queueEventHandler) { - super(role); + public LocalAsyncWorkerRef(QueueEventHandler queueEventHandler) { this.queueEventHandler = queueEventHandler; } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerContext.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerContext.java deleted file mode 100644 index e2bfe911bb..0000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerContext.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author peng-yongsheng - */ -public abstract class WorkerContext implements Context { - - private final Logger logger = LoggerFactory.getLogger(WorkerContext.class); - - private Map remoteWorkerRefs; - private Map> roleWorkers; - private Map roles; - - WorkerContext() { - this.roleWorkers = new HashMap<>(); - this.roles = new HashMap<>(); - this.remoteWorkerRefs = new HashMap<>(); - } - - private Map> getRoleWorkers() { - return this.roleWorkers; - } - - @Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException { - if (getRoleWorkers().containsKey(role.roleName())) { - return new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector()); - } else { - throw new WorkerNotFoundException("role=" + role.roleName() + ", no available worker."); - } - } - - @Override final public RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException { - if (remoteWorkerRefs.containsKey(roleName)) { - return remoteWorkerRefs.get(roleName); - } else { - throw new WorkerNotFoundException("role=" + roleName + ", no available worker."); - } - } - - public final void putRole(Role role) { - roles.put(role.roleName(), role); - } - - public final Role getRole(String roleName) { - return roles.get(roleName); - } - - @Override final public void put(WorkerRef workerRef) { - logger.debug("put worker reference into context, role name: {}", workerRef.getRole().roleName()); - if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) { - getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<>()); - } - getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef); - - if (workerRef instanceof RemoteWorkerRef) { - RemoteWorkerRef remoteWorkerRef = (RemoteWorkerRef)workerRef; - if (!remoteWorkerRef.isAcrossJVM()) { - remoteWorkerRefs.put(workerRef.getRole().roleName(), remoteWorkerRef); - } - } - } - - @Override final public void remove(WorkerRef workerRef) { - getRoleWorkers().remove(workerRef.getRole().roleName()); - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java index 22440358d6..1db18bc90f 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRef.java @@ -22,15 +22,5 @@ package org.skywalking.apm.collector.stream.worker.base; * @author peng-yongsheng */ public abstract class WorkerRef { - private Role role; - - public WorkerRef(Role role) { - this.role = role; - } - - final public Role getRole() { - return role; - } - public abstract void tell(Object message) throws WorkerInvokeException; } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java index 09bcb23e4a..bf7d8cee00 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/impl/PersistenceWorker.java @@ -24,13 +24,10 @@ import java.util.Map; import org.skywalking.apm.collector.core.data.Data; import org.skywalking.apm.collector.core.util.ObjectUtils; import org.skywalking.apm.collector.queue.base.EndOfBatchCommand; -import org.skywalking.apm.collector.storage.base.dao.DAOContainer; import org.skywalking.apm.collector.storage.base.dao.IBatchDAO; import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO; +import org.skywalking.apm.collector.storage.service.DAOService; import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker; -import org.skywalking.apm.collector.stream.worker.base.ClusterWorkerContext; -import org.skywalking.apm.collector.stream.worker.base.ProviderNotFoundException; -import org.skywalking.apm.collector.stream.worker.base.Role; import org.skywalking.apm.collector.stream.worker.base.WorkerException; import org.skywalking.apm.collector.stream.worker.impl.data.DataCache; import org.slf4j.Logger; @@ -43,15 +40,12 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker { private final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class); - private DataCache dataCache; + private final DAOService daoService; + private final DataCache dataCache; - public PersistenceWorker(Role role, ClusterWorkerContext clusterContext) { - super(role, clusterContext); - dataCache = new DataCache(); - } - - @Override public void preStart() throws ProviderNotFoundException { - super.preStart(); + public PersistenceWorker(DAOService daoService) { + this.dataCache = new DataCache(); + this.daoService = daoService; } @Override protected final void onWork(Object message) throws WorkerException { @@ -71,7 +65,7 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker { dataCache.switchPointer(); List collection = buildBatchCollection(); - IBatchDAO dao = (IBatchDAO)DAOContainer.INSTANCE.get(IBatchDAO.class.getName()); + IBatchDAO dao = (IBatchDAO)daoService.get(IBatchDAO.class); dao.batchPersistence(collection); } } finally { @@ -107,9 +101,9 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker { List updateBatchCollection = new LinkedList<>(); dataMap.forEach((id, data) -> { if (needMergeDBData()) { - Data dbData = persistenceDAO().get(id, getRole().dataDefine()); + Data dbData = persistenceDAO().get(id); if (ObjectUtils.isNotEmpty(dbData)) { - getRole().dataDefine().mergeData(data, dbData); + dbData.mergeData(data); try { updateBatchCollection.add(persistenceDAO().prepareBatchUpdate(data)); } catch (Throwable t) { @@ -137,12 +131,12 @@ public abstract class PersistenceWorker extends AbstractLocalAsyncWorker { private void aggregate(Object message) { dataCache.writing(); - Data data = (Data)message; + Data newData = (Data)message; - if (dataCache.containsKey(data.getId())) { - getRole().dataDefine().mergeData(dataCache.get(data.getId()), data); + if (dataCache.containsKey(newData.getId())) { + dataCache.get(newData.getId()).mergeData(newData); } else { - dataCache.put(data.getId(), data); + dataCache.put(newData.getId(), newData); } dataCache.finishWriting(); -- GitLab