diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java index 8b13cfd82dc0d13671d98a9e3d49d8b2ba6a9c8d..863016d126ec6a3649237d1d2ed0bb378d7c6707 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java @@ -99,6 +99,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider { gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager())); gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager())); gRPCServer.addHandler(new JVMMetricsServiceHandler()); - gRPCServer.addHandler(new TraceSegmentServiceHandler()); + gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager())); } } diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java index 66bdcf15bf6bbb2648d8cf0e66ee9af39e87f0fb..4c6b80a6b9bbc639a947a2907485fac660551349 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java @@ -19,6 +19,8 @@ package org.skywalking.apm.collector.agent.grpc.handler; import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.agent.stream.parser.SegmentParse; +import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.server.grpc.GRPCHandler; import org.skywalking.apm.network.proto.Downstream; import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc; @@ -33,12 +35,18 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class); + private final ModuleManager moduleManager; + + public TraceSegmentServiceHandler(ModuleManager moduleManager) { + this.moduleManager = moduleManager; + } + @Override public StreamObserver collect(StreamObserver responseObserver) { return new StreamObserver() { @Override public void onNext(UpstreamSegment segment) { logger.debug("receive segment"); -// SegmentParse segmentParse = new SegmentParse(); -// segmentParse.parse(segment, SegmentParse.Source.Agent); + SegmentParse segmentParse = new SegmentParse(moduleManager); + segmentParse.parse(segment, SegmentParse.Source.Agent); } @Override public void onError(Throwable throwable) { diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java index 06a845e9e4a98fdb63ef3de28f260db09e9bbf1c..91afecb5a24e63878229da9795337a364934a0c7 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/AgentStreamSingleton.java @@ -56,6 +56,7 @@ public class AgentStreamSingleton { PersistenceTimer timer = new PersistenceTimer(); timer.start(moduleManager, workerCreateListener.getPersistenceWorkers()); + } private void createJVMGraph() { diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java index defdcca6a11edc740c75a18135b4341ebe0f40c4..355b17c8672f9cf3504f9aa3c08ce75c82860e99 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/JvmMetricStreamGraph.java @@ -55,47 +55,42 @@ public class JvmMetricStreamGraph { } @SuppressWarnings("unchecked") - public Graph createGcMetricGraph() { + public void createGcMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(GC_METRIC_GRAPH_ID, GCMetric.class); graph.addNode(new GCMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createCpuMetricGraph() { + public void createCpuMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(CPU_METRIC_GRAPH_ID, CpuMetric.class); graph.addNode(new CpuMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createMemoryMetricGraph() { + public void createMemoryMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_METRIC_GRAPH_ID, MemoryMetric.class); graph.addNode(new MemoryMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createMemoryPoolMetricGraph() { + public void createMemoryPoolMetricGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(MEMORY_POOL_METRIC_GRAPH_ID, MemoryPoolMetric.class); graph.addNode(new MemoryPoolMetricPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createHeartBeatGraph() { + public void createHeartBeatGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_HEART_BEAT_GRAPH_ID, Instance.class); graph.addNode(new InstHeartBeatPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java index 915259f580e7a2d82905bf5ebfdb7c1c3dd19c4b..9e5d7af081038b7be37b61470794b2492a47789d 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/RegisterStreamGraph.java @@ -54,7 +54,7 @@ public class RegisterStreamGraph { } @SuppressWarnings("unchecked") - public Graph createApplicationRegisterGraph() { + public void createApplicationRegisterGraph() { RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -62,11 +62,10 @@ public class RegisterStreamGraph { Graph graph = GraphManager.INSTANCE.createIfAbsent(APPLICATION_REGISTER_GRAPH_ID, Application.class); graph.addNode(new ApplicationRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, APPLICATION_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new ApplicationRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createInstanceRegisterGraph() { + public void createInstanceRegisterGraph() { RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -74,11 +73,10 @@ public class RegisterStreamGraph { Graph graph = GraphManager.INSTANCE.createIfAbsent(INSTANCE_REGISTER_GRAPH_ID, Instance.class); graph.addNode(new InstanceRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, INSTANCE_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new InstanceRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createServiceNameRegisterGraph() { + public void createServiceNameRegisterGraph() { RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); @@ -86,6 +84,5 @@ public class RegisterStreamGraph { Graph graph = GraphManager.INSTANCE.createIfAbsent(SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); graph.addNode(new ServiceNameRegisterRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_NAME_REGISTER_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceNameRegisterSerialWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java index 06d18f85319c15cfb0475e61c5bce4b27f0e40e5..49cb88d5b97e4c9868c41a34e9916c2796103365 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/TraceStreamGraph.java @@ -18,6 +18,7 @@ package org.skywalking.apm.collector.agent.stream.graph; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTracePersistenceWorker; import org.skywalking.apm.collector.agent.stream.worker.trace.instance.InstPerformancePersistenceWorker; @@ -55,7 +56,6 @@ import org.skywalking.apm.collector.storage.table.segment.SegmentCost; import org.skywalking.apm.collector.storage.table.service.ServiceEntry; import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference; import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener; -import org.skywalking.apm.network.proto.UpstreamSegment; /** * @author peng-yongsheng @@ -82,34 +82,31 @@ public class TraceStreamGraph { } @SuppressWarnings("unchecked") - public Graph createSegmentStandardizationGraph() { - QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); + public void createSegmentStandardizationGraph() { + QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); - Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); + Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class); graph.addNode(new SegmentStandardizationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createGlobalTraceGraph() { + public void createGlobalTraceGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(GLOBAL_TRACE_GRAPH_ID, GlobalTrace.class); graph.addNode(new GlobalTracePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createInstPerformanceGraph() { + public void createInstPerformanceGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(INST_PERFORMANCE_GRAPH_ID, InstPerformance.class); graph.addNode(new InstPerformancePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createNodeComponentGraph() { + public void createNodeComponentGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); @@ -117,11 +114,10 @@ public class TraceStreamGraph { graph.addNode(new NodeComponentAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new NodeComponentRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_COMPONENT_GRAPH_ID).create(workerCreateListener)) .addNext(new NodeComponentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createNodeMappingGraph() { + public void createNodeMappingGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); @@ -129,11 +125,10 @@ public class TraceStreamGraph { graph.addNode(new NodeMappingAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new NodeMappingRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_MAPPING_GRAPH_ID).create(workerCreateListener)) .addNext(new NodeMappingPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createNodeReferenceGraph() { + public void createNodeReferenceGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); @@ -141,11 +136,10 @@ public class TraceStreamGraph { graph.addNode(new NodeReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new NodeReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, NODE_REFERENCE_GRAPH_ID).create(workerCreateListener)) .addNext(new NodeReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createServiceEntryGraph() { + public void createServiceEntryGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); @@ -153,11 +147,10 @@ public class TraceStreamGraph { graph.addNode(new ServiceEntryAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new ServiceEntryRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_ENTRY_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceEntryPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createServiceReferenceGraph() { + public void createServiceReferenceGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class); @@ -165,24 +158,21 @@ public class TraceStreamGraph { graph.addNode(new ServiceReferenceAggregationWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)) .addNext(new ServiceReferenceRemoteWorker.Factory(moduleManager, remoteSenderService, SERVICE_REFERENCE_GRAPH_ID).create(workerCreateListener)) .addNext(new ServiceReferencePersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createSegmentGraph() { + public void createSegmentGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_GRAPH_ID, Segment.class); graph.addNode(new SegmentPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } @SuppressWarnings("unchecked") - public Graph createSegmentCostGraph() { + public void createSegmentCostGraph() { QueueCreatorService queueCreatorService = moduleManager.find(QueueModule.NAME).getService(QueueCreatorService.class); Graph graph = GraphManager.INSTANCE.createIfAbsent(SEGMENT_COST_GRAPH_ID, SegmentCost.class); graph.addNode(new SegmentCostPersistenceWorker.Factory(moduleManager, queueCreatorService).create(workerCreateListener)); - return graph; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java index 982850f38879b45751c89b6fc04ce44058566561..ddb3f2ce020934a2fc8c4f087a098a7d7c4df7a5 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/SegmentParse.java @@ -25,6 +25,7 @@ import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.ReferenceIdExchanger; import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentDecorator; +import org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardization; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanDecorator; import org.skywalking.apm.collector.agent.stream.parser.standardization.SpanIdExchanger; import org.skywalking.apm.collector.agent.stream.worker.trace.global.GlobalTraceSpanListener; @@ -159,8 +160,10 @@ public class SegmentParse { private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) { logger.debug("send to segment buffer write worker, id: {}", id); - Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, UpstreamSegment.class); - graph.start(upstreamSegment); + SegmentStandardization standardization = new SegmentStandardization(id); + standardization.setUpstreamSegment(upstreamSegment); + Graph graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class); + graph.start(standardization); } private void notifyListenerToBuild() { diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalWorkerProviderDefinitionFile.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardization.java similarity index 55% rename from apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalWorkerProviderDefinitionFile.java rename to apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardization.java index 0f6c9f439a6343776f59de1c239339a97e40387f..b3f8125d14aeaf21cd28dc82e2f7b19fa9670465 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalWorkerProviderDefinitionFile.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardization.java @@ -16,15 +16,27 @@ * Project repository: https://github.com/OpenSkywalking/skywalking */ -package org.skywalking.apm.collector.stream.worker.base; +package org.skywalking.apm.collector.agent.stream.parser.standardization; -import org.skywalking.apm.collector.core.define.DefinitionFile; +import org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage; +import org.skywalking.apm.network.proto.UpstreamSegment; /** * @author peng-yongsheng */ -public class LocalWorkerProviderDefinitionFile extends DefinitionFile { - @Override protected String fileName() { - return "local_worker_provider.define"; +public class SegmentStandardization extends EndOfBatchQueueMessage { + + public SegmentStandardization(String key) { + super(key); + } + + private UpstreamSegment upstreamSegment; + + public UpstreamSegment getUpstreamSegment() { + return upstreamSegment; + } + + public void setUpstreamSegment(UpstreamSegment upstreamSegment) { + this.upstreamSegment = upstreamSegment; } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java index 441b3680533c89cef4a7f55b53c9df29dd1f1b4b..8fe297ad253d5dd1b3233acf49d930ed7b6546ff 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/parser/standardization/SegmentStandardizationWorker.java @@ -18,20 +18,21 @@ package org.skywalking.apm.collector.agent.stream.parser.standardization; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.skywalking.apm.collector.agent.stream.buffer.SegmentBufferManager; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.service.QueueCreatorService; import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorker; import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider; import org.skywalking.apm.collector.stream.worker.base.WorkerException; -import org.skywalking.apm.network.proto.UpstreamSegment; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker { +public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker { private final Logger logger = LoggerFactory.getLogger(SegmentStandardizationWorker.class); @@ -44,25 +45,32 @@ public class SegmentStandardizationWorker extends AbstractLocalAsyncWorker { - public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { + public static class Factory extends AbstractLocalAsyncWorkerProvider { + + public Factory(ModuleManager moduleManager, QueueCreatorService queueCreatorService) { super(moduleManager, queueCreatorService); } @Override public SegmentStandardizationWorker workerInstance(ModuleManager moduleManager) { - return new SegmentStandardizationWorker(moduleManager); + SegmentStandardizationWorker standardizationWorker = new SegmentStandardizationWorker(moduleManager); + startTimer(standardizationWorker); + return standardizationWorker; } @Override public int queueSize() { return 1024; } + + private void startTimer(SegmentStandardizationWorker standardizationWorker) { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(standardizationWorker::flushAndSwitch, 10, 3, TimeUnit.SECONDS); + } } } diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java index cbf98107aa89f2dd47349290a00f5baf64b5bd71..738a595f42b4607a71fec6a02a4cf7e77b167245 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/worker/register/ServiceNameService.java @@ -40,7 +40,7 @@ public class ServiceNameService { public ServiceNameService(ModuleManager moduleManager) { this.moduleManager = moduleManager; - this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.APPLICATION_REGISTER_GRAPH_ID, ServiceName.class); + this.serviceNameRegisterGraph = GraphManager.INSTANCE.createIfAbsent(RegisterStreamGraph.SERVICE_NAME_REGISTER_GRAPH_ID, ServiceName.class); } public int getOrCreate(int applicationId, String serviceName) { diff --git a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml index 2665f80d2d298a3579bce617bac25b70e4d55482..db54a640289e294f832cc722087e4659121d5a7d 100644 --- a/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml +++ b/apm-collector/apm-collector-boot/src/main/resources/log4j2.xml @@ -26,7 +26,8 @@ - + + diff --git a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java index bcae667074594e20d05dd5661f7890f623b1f702..e6ea0f045b40f0086ab3cb7f53b40241779cf2cf 100644 --- a/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java +++ b/apm-collector/apm-collector-queue/collector-queue-define/src/main/java/org/skywalking/apm/collector/queue/base/MessageHolder.java @@ -18,12 +18,12 @@ package org.skywalking.apm.collector.queue.base; -import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage; /** * @author peng-yongsheng */ -public class MessageHolder { +public class MessageHolder { private MESSAGE message; public MESSAGE getMessage() { diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java index ad2131cf3b9e84f488a8c552fef1cf4f1b9727fc..c0b876a985273f1619ac6aff13b9ad5870e1ce57 100644 --- a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorEventHandler.java @@ -22,6 +22,7 @@ import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import org.skywalking.apm.collector.core.CollectorException; import org.skywalking.apm.collector.core.data.Data; +import org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage; import org.skywalking.apm.collector.queue.base.MessageHolder; import org.skywalking.apm.collector.queue.base.QueueEventHandler; import org.skywalking.apm.collector.queue.base.QueueExecutor; @@ -31,7 +32,7 @@ import org.slf4j.LoggerFactory; /** * @author peng-yongsheng */ -public class DisruptorEventHandler implements EventHandler>, QueueEventHandler { +public class DisruptorEventHandler implements EventHandler>, QueueEventHandler { private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class); diff --git a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java index f2834709ed9c2923b4ffee284ac7e6c99e8bcdae..c63cff78aa6e91dc03b8a410fbb79debbc777787 100644 --- a/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java +++ b/apm-collector/apm-collector-queue/collector-queue-disruptor-provider/src/main/java/org/skywalking/apm/collector/queue/disruptor/base/DisruptorQueueCreator.java @@ -38,7 +38,7 @@ public class DisruptorQueueCreator implements QueueCreator { } // Construct the Disruptor - Disruptor disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); + Disruptor disruptor = new Disruptor<>(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE); RingBuffer ringBuffer = disruptor.getRingBuffer(); DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor); diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/noderef/NodeReference.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/noderef/NodeReference.java index 9461b582370cf3bddbe5a7f8fbd1568bb45cd890..53532f9e3a9b1a16c048ed3a7f20c46675921dfd 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/noderef/NodeReference.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/noderef/NodeReference.java @@ -53,6 +53,12 @@ public class NodeReference extends Data { public NodeReference(String id) { super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS); + setS1Lte(0); + setS3Lte(0); + setS5Lte(0); + setS5Gt(0); + setError(0); + setSummary(0); } public String getBehindPeer() { diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/serviceref/ServiceReference.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/serviceref/ServiceReference.java index 769f2bec4a4fde8c41273803bc566c840dae7b6b..2cd84848feb2a54441ec8661ef322fbd7790614f 100644 --- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/serviceref/ServiceReference.java +++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/skywalking/apm/collector/storage/table/serviceref/ServiceReference.java @@ -58,6 +58,13 @@ public class ServiceReference extends Data { public ServiceReference(String id) { super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS); + setS1Lte(0L); + setS3Lte(0L); + setS5Lte(0L); + setS5Gt(0L); + setError(0L); + setSummary(0L); + setCostSummary(0L); } public String getEntryServiceName() { diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java index e483df335b32d17b5becf5791ac23b979dfd9636..43e0e7c0e6bb4240a1570b23032332c27c78a775 100644 --- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java @@ -44,6 +44,7 @@ import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO; import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO; import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO; import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO; import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO; @@ -59,6 +60,7 @@ import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO; import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO; import org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO; import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller; import org.skywalking.apm.collector.storage.es.dao.ApplicationEsCacheDAO; @@ -78,6 +80,7 @@ import org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenc import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO; +import org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.NodeComponentEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.NodeComponentEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.NodeMappingEsPersistenceDAO; @@ -93,6 +96,7 @@ import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsPersistenceDAO; +import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsUIDAO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,7 +194,7 @@ public class StorageModuleEsProvider extends ModuleProvider { this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(IGCMetricUIDAO.class, new GCMetricEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(IMemoryMetricUIDAO.class, new MemoryMetricEsUIDAO(elasticSearchClient)); -// this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricEsUIDAO(elasticSearchClient)); + this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(IGlobalTraceUIDAO.class, new GlobalTraceEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(IInstPerformanceUIDAO.class, new InstPerformanceEsUIDAO(elasticSearchClient)); @@ -200,6 +204,6 @@ public class StorageModuleEsProvider extends ModuleProvider { this.registerServiceImplementation(ISegmentCostUIDAO.class, new SegmentCostEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(ISegmentUIDAO.class, new SegmentEsUIDAO(elasticSearchClient)); this.registerServiceImplementation(IServiceEntryUIDAO.class, new ServiceEntryEsUIDAO(elasticSearchClient)); -// this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceEsUIDAO(elasticSearchClient)); + this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceEsUIDAO(elasticSearchClient)); } } diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/MemoryPoolMetricEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/MemoryPoolMetricEsUIDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..3485a5481306da388374bb237e9cf5c442f746de --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/MemoryPoolMetricEsUIDAO.java @@ -0,0 +1,88 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.es.dao; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequestBuilder; +import org.elasticsearch.action.get.MultiGetResponse; +import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO; +import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; +import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable; + +/** + * @author peng-yongsheng + */ +public class MemoryPoolMetricEsUIDAO extends EsDAO implements IMemoryPoolMetricUIDAO { + + public MemoryPoolMetricEsUIDAO(ElasticSearchClient client) { + super(client); + } + + @Override public JsonObject getMetric(int instanceId, long timeBucket, int poolType) { + String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType; + GetResponse getResponse = getClient().prepareGet(MemoryPoolMetricTable.TABLE, id).get(); + + JsonObject metric = new JsonObject(); + if (getResponse.isExists()) { + metric.addProperty("max", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).intValue()); + metric.addProperty("init", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).intValue()); + metric.addProperty("used", ((Number)getResponse.getSource().get(MemoryPoolMetricTable.COLUMN_USED)).intValue()); + } else { + metric.addProperty("max", 0); + metric.addProperty("init", 0); + metric.addProperty("used", 0); + } + return metric; + } + + @Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) { + MultiGetRequestBuilder prepareMultiGet = getClient().prepareMultiGet(); + + long timeBucket = startTimeBucket; + do { + timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND.name(), timeBucket, 1); + String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType; + prepareMultiGet.add(MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.TABLE_TYPE, id); + } + while (timeBucket <= endTimeBucket); + + JsonObject metric = new JsonObject(); + JsonArray usedMetric = new JsonArray(); + MultiGetResponse multiGetResponse = prepareMultiGet.get(); + for (MultiGetItemResponse response : multiGetResponse.getResponses()) { + if (response.getResponse().isExists()) { + metric.addProperty("max", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_MAX)).longValue()); + metric.addProperty("init", ((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_INIT)).longValue()); + usedMetric.add(((Number)response.getResponse().getSource().get(MemoryPoolMetricTable.COLUMN_USED)).longValue()); + } else { + metric.addProperty("max", 0); + metric.addProperty("init", 0); + usedMetric.add(0); + } + } + metric.add("used", usedMetric); + return metric; + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceEsUIDAO.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceEsUIDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..1a4531d1337901cdda1ad7d352891174e053334a --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/skywalking/apm/collector/storage/es/dao/ServiceReferenceEsUIDAO.java @@ -0,0 +1,125 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.es.dao; + +import com.google.gson.JsonObject; +import java.util.LinkedHashMap; +import java.util.Map; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient; +import org.skywalking.apm.collector.core.util.ColumnNameUtils; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO; +import org.skywalking.apm.collector.storage.es.base.dao.EsDAO; +import org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class ServiceReferenceEsUIDAO extends EsDAO implements IServiceReferenceUIDAO { + + private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsUIDAO.class); + + public ServiceReferenceEsUIDAO(ElasticSearchClient client) { + super(client); + } + + @Override + public Map load(int entryServiceId, long startTime, long endTime) { + SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceReferenceTable.TABLE); + searchRequestBuilder.setTypes(ServiceReferenceTable.TABLE_TYPE); + searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime)); + boolQuery.must().add(QueryBuilders.rangeQuery(ServiceReferenceTable.COLUMN_TIME_BUCKET).gte(startTime).lte(endTime)); + boolQuery.must().add(QueryBuilders.matchQuery(ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, entryServiceId)); + + searchRequestBuilder.setQuery(boolQuery); + searchRequestBuilder.setSize(0); + + return load(searchRequestBuilder); + } + + private Map load(SearchRequestBuilder searchRequestBuilder) { + searchRequestBuilder.addAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID).field(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID).size(100) + .subAggregation(AggregationBuilders.terms(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).field(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID).size(100) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S1_LTE).field(ServiceReferenceTable.COLUMN_S1_LTE)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S3_LTE).field(ServiceReferenceTable.COLUMN_S3_LTE)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_LTE).field(ServiceReferenceTable.COLUMN_S5_LTE)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_S5_GT).field(ServiceReferenceTable.COLUMN_S5_GT)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_ERROR).field(ServiceReferenceTable.COLUMN_ERROR)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_SUMMARY).field(ServiceReferenceTable.COLUMN_SUMMARY)) + .subAggregation(AggregationBuilders.sum(ServiceReferenceTable.COLUMN_COST_SUMMARY).field(ServiceReferenceTable.COLUMN_COST_SUMMARY)))); + + Map serviceReferenceMap = new LinkedHashMap<>(); + + SearchResponse searchResponse = searchRequestBuilder.get(); + Terms frontServiceIdTerms = searchResponse.getAggregations().get(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID); + for (Terms.Bucket frontServiceBucket : frontServiceIdTerms.getBuckets()) { + int frontServiceId = frontServiceBucket.getKeyAsNumber().intValue(); + if (frontServiceId != 0) { + parseSubAggregate(serviceReferenceMap, frontServiceBucket, frontServiceId); + } + } + + return serviceReferenceMap; + } + + private void parseSubAggregate(Map serviceReferenceMap, + Terms.Bucket frontServiceBucket, + int frontServiceId) { + Terms behindServiceIdTerms = frontServiceBucket.getAggregations().get(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID); + for (Terms.Bucket behindServiceIdBucket : behindServiceIdTerms.getBuckets()) { + int behindServiceId = behindServiceIdBucket.getKeyAsNumber().intValue(); + if (behindServiceId != 0) { + Sum s1LteSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_S1_LTE); + Sum s3LteSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_S3_LTE); + Sum s5LteSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_S5_LTE); + Sum s5GtSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_S5_GT); + Sum error = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_ERROR); + Sum summary = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_SUMMARY); + Sum costSum = behindServiceIdBucket.getAggregations().get(ServiceReferenceTable.COLUMN_COST_SUMMARY); + + JsonObject serviceReference = new JsonObject(); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID), frontServiceId); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID), behindServiceId); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S1_LTE), (long)s1LteSum.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S3_LTE), (long)s3LteSum.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S5_LTE), (long)s5LteSum.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S5_GT), (long)s5GtSum.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_ERROR), (long)error.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_SUMMARY), (long)summary.getValue()); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_COST_SUMMARY), (long)costSum.getValue()); + + String id = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)) + Const.ID_SPLIT + serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)); + serviceReferenceMap.put(id, serviceReference); + } + } + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java index 955b857ad9dca69c15677332902765b1f101f11d..82b1fc9804b1de0d60e311a6d5dbe4cc3d549a7d 100644 --- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java @@ -44,6 +44,7 @@ import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricUIDAO; import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricPersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO; import org.skywalking.apm.collector.storage.dao.INodeComponentPersistenceDAO; import org.skywalking.apm.collector.storage.dao.INodeComponentUIDAO; import org.skywalking.apm.collector.storage.dao.INodeMappingPersistenceDAO; @@ -59,6 +60,7 @@ import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO; import org.skywalking.apm.collector.storage.dao.IServiceReferencePersistenceDAO; +import org.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO; import org.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO; import org.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller; import org.skywalking.apm.collector.storage.h2.dao.ApplicationH2CacheDAO; @@ -78,6 +80,7 @@ import org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2Persistenc import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2PersistenceDAO; +import org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.NodeComponentH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.NodeMappingH2PersistenceDAO; @@ -93,6 +96,7 @@ import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2RegisterDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2PersistenceDAO; +import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2UIDAO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -186,7 +190,7 @@ public class StorageModuleH2Provider extends ModuleProvider { this.registerServiceImplementation(ICpuMetricUIDAO.class, new CpuMetricH2UIDAO(h2Client)); this.registerServiceImplementation(IGCMetricUIDAO.class, new GCMetricH2UIDAO(h2Client)); this.registerServiceImplementation(IMemoryMetricUIDAO.class, new MemoryMetricH2UIDAO(h2Client)); -// this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricH2UIDAO(h2Client)); + this.registerServiceImplementation(IMemoryPoolMetricUIDAO.class, new MemoryPoolMetricH2UIDAO(h2Client)); this.registerServiceImplementation(IGlobalTraceUIDAO.class, new GlobalTraceH2UIDAO(h2Client)); this.registerServiceImplementation(IInstPerformanceUIDAO.class, new InstPerformanceH2UIDAO(h2Client)); @@ -196,6 +200,6 @@ public class StorageModuleH2Provider extends ModuleProvider { this.registerServiceImplementation(ISegmentCostUIDAO.class, new SegmentCostH2UIDAO(h2Client)); this.registerServiceImplementation(ISegmentUIDAO.class, new SegmentH2UIDAO(h2Client)); this.registerServiceImplementation(IServiceEntryUIDAO.class, new ServiceEntryH2UIDAO(h2Client)); -// this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceH2UIDAO(elasticSearchClient)); + this.registerServiceImplementation(IServiceReferenceUIDAO.class, new ServiceReferenceH2UIDAO(h2Client)); } } diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/MemoryPoolMetricH2UIDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/MemoryPoolMetricH2UIDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..22cb0228f62342a47cc17be6af3382e9c8a8a3d7 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/MemoryPoolMetricH2UIDAO.java @@ -0,0 +1,106 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.h2.dao; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.storage.base.sql.SqlBuilder; +import org.skywalking.apm.collector.storage.dao.IMemoryPoolMetricUIDAO; +import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO; +import org.skywalking.apm.collector.storage.table.jvm.MemoryPoolMetricTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author clevertension + */ +public class MemoryPoolMetricH2UIDAO extends H2DAO implements IMemoryPoolMetricUIDAO { + + private final Logger logger = LoggerFactory.getLogger(MemoryPoolMetricH2UIDAO.class); + private static final String GET_MEMORY_POOL_METRIC_SQL = "select * from {0} where {1} = ?"; + + public MemoryPoolMetricH2UIDAO(H2Client client) { + super(client); + } + + @Override public JsonObject getMetric(int instanceId, long timeBucket, int poolType) { + H2Client client = getClient(); + String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType; + String sql = SqlBuilder.buildSql(GET_MEMORY_POOL_METRIC_SQL, MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.COLUMN_ID); + Object[] params = new Object[] {id}; + JsonObject metric = new JsonObject(); + try (ResultSet rs = client.executeQuery(sql, params)) { + if (rs.next()) { + metric.addProperty("max", rs.getInt(MemoryPoolMetricTable.COLUMN_MAX)); + metric.addProperty("init", rs.getInt(MemoryPoolMetricTable.COLUMN_INIT)); + metric.addProperty("used", rs.getInt(MemoryPoolMetricTable.COLUMN_USED)); + } else { + metric.addProperty("max", 0); + metric.addProperty("init", 0); + metric.addProperty("used", 0); + } + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } + return metric; + } + + @Override public JsonObject getMetric(int instanceId, long startTimeBucket, long endTimeBucket, int poolType) { + H2Client client = getClient(); + String sql = SqlBuilder.buildSql(GET_MEMORY_POOL_METRIC_SQL, MemoryPoolMetricTable.TABLE, MemoryPoolMetricTable.COLUMN_ID); + List idList = new ArrayList<>(); + long timeBucket = startTimeBucket; + do { + timeBucket = TimeBucketUtils.INSTANCE.addSecondForSecondTimeBucket(TimeBucketUtils.TimeBucketType.SECOND.name(), timeBucket, 1); + String id = timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + poolType; + idList.add(id); + } + while (timeBucket <= endTimeBucket); + + JsonObject metric = new JsonObject(); + JsonArray usedMetric = new JsonArray(); + + idList.forEach(id -> { + try (ResultSet rs = client.executeQuery(sql, new String[] {id})) { + if (rs.next()) { + metric.addProperty("max", rs.getLong(MemoryPoolMetricTable.COLUMN_MAX)); + metric.addProperty("init", rs.getLong(MemoryPoolMetricTable.COLUMN_INIT)); + usedMetric.add(rs.getLong(MemoryPoolMetricTable.COLUMN_USED)); + } else { + metric.addProperty("max", 0); + metric.addProperty("init", 0); + usedMetric.add(0); + } + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } + }); + + metric.add("used", usedMetric); + return metric; + } +} diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceH2UIDAO.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceH2UIDAO.java new file mode 100644 index 0000000000000000000000000000000000000000..32b974e29473bca56ac57fbca91a96015baf3777 --- /dev/null +++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/skywalking/apm/collector/storage/h2/dao/ServiceReferenceH2UIDAO.java @@ -0,0 +1,110 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.storage.h2.dao; + +import com.google.gson.JsonObject; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.Map; +import org.skywalking.apm.collector.client.h2.H2Client; +import org.skywalking.apm.collector.client.h2.H2ClientException; +import org.skywalking.apm.collector.core.util.ColumnNameUtils; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.storage.base.sql.SqlBuilder; +import org.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO; +import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO; +import org.skywalking.apm.collector.storage.table.serviceref.ServiceReferenceTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng, clevertension + */ +public class ServiceReferenceH2UIDAO extends H2DAO implements IServiceReferenceUIDAO { + + private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2UIDAO.class); + + public ServiceReferenceH2UIDAO(H2Client client) { + super(client); + } + + private static final String GET_SRV_REF_LOAD1 = "select {3}, {4}, sum({5}) as {5}, sum({6}) as {6}, sum({7}) as {7}" + + ",sum({8}) as {8}, sum({9}) as {9}, sum({10}) as {10}, sum({11}) as {11} from {0} where {1} >= ? and {1} <= ? and {2} = ? group by {3}, {4}"; + + @Override + public Map load(int entryServiceId, long startTime, long endTime) { + H2Client client = getClient(); + String sql = SqlBuilder.buildSql(GET_SRV_REF_LOAD1, ServiceReferenceTable.TABLE, + ServiceReferenceTable.COLUMN_TIME_BUCKET, ServiceReferenceTable.COLUMN_ENTRY_SERVICE_ID, + ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID, ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID, + ServiceReferenceTable.COLUMN_S1_LTE, ServiceReferenceTable.COLUMN_S3_LTE, ServiceReferenceTable.COLUMN_S5_LTE, + ServiceReferenceTable.COLUMN_S5_GT, ServiceReferenceTable.COLUMN_ERROR, ServiceReferenceTable.COLUMN_SUMMARY, + ServiceReferenceTable.COLUMN_COST_SUMMARY); + Object[] params = new Object[] {startTime, endTime, entryServiceId}; + + return load(client, params, sql); + } + + private Map load(H2Client client, Object[] params, String sql) { + Map serviceReferenceMap = new LinkedHashMap<>(); + + try (ResultSet rs = client.executeQuery(sql, params)) { + while (rs.next()) { + int frontServiceId = rs.getInt(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID); + parseSubAggregate(serviceReferenceMap, rs, frontServiceId); + } + } catch (SQLException | H2ClientException e) { + logger.error(e.getMessage(), e); + } + return serviceReferenceMap; + } + + private void parseSubAggregate(Map serviceReferenceMap, ResultSet rs, + int frontServiceId) { + try { + int behindServiceId = rs.getInt(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID); + if (behindServiceId != 0) { + long s1LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S1_LTE); + long s3LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S3_LTE); + long s5LteSum = rs.getLong(ServiceReferenceTable.COLUMN_S5_LTE); + long s5GtSum = rs.getLong(ServiceReferenceTable.COLUMN_S5_GT); + long error = rs.getLong(ServiceReferenceTable.COLUMN_ERROR); + long summary = rs.getLong(ServiceReferenceTable.COLUMN_SUMMARY); + long costSum = rs.getLong(ServiceReferenceTable.COLUMN_COST_SUMMARY); + + JsonObject serviceReference = new JsonObject(); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID), frontServiceId); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID), behindServiceId); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S1_LTE), s1LteSum); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S3_LTE), s3LteSum); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S5_LTE), s5LteSum); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_S5_GT), s5GtSum); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_ERROR), error); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_SUMMARY), summary); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_COST_SUMMARY), costSum); + + String id = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)) + Const.ID_SPLIT + serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)); + serviceReferenceMap.put(id, serviceReference); + } + } catch (SQLException e) { + logger.error(e.getMessage(), e); + } + } +} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java index 81c49ca710bb58f64c9fab31308e114a6c7ee180..e1a6785d785f08ddc0df7c6fddf126173745b353 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorker.java @@ -19,7 +19,6 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.core.module.ModuleManager; -import org.skywalking.apm.collector.queue.base.QueueExecutor; /** * The AbstractLocalAsyncWorker implementations represent workers, @@ -28,23 +27,9 @@ import org.skywalking.apm.collector.queue.base.QueueExecutor; * @author peng-yongsheng * @since v3.0-2017 */ -public abstract class AbstractLocalAsyncWorker extends AbstractWorker implements QueueExecutor { +public abstract class AbstractLocalAsyncWorker extends AbstractWorker { public AbstractLocalAsyncWorker(ModuleManager moduleManager) { super(moduleManager); } - - /** - * Receive message - * - * @param message The persistence data or metric data. - * @throws WorkerException The Exception happen in {@link #onWork(INPUT)} - */ - final public void allocateJob(INPUT message) throws WorkerException { - onWork(message); - } - - @Override public final void execute(INPUT message) throws WorkerException { - onWork(message); - } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java index 64d89696c40e5fc8752d3a040a529a27f6348a50..c8444aecfd49f3a339429e0b170bd3fd7bcb0e5b 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/AbstractLocalAsyncWorkerProvider.java @@ -20,13 +20,12 @@ package org.skywalking.apm.collector.stream.worker.base; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.queue.base.QueueEventHandler; -import org.skywalking.apm.collector.queue.base.QueueExecutor; import org.skywalking.apm.collector.queue.service.QueueCreatorService; /** * @author peng-yongsheng */ -public abstract class AbstractLocalAsyncWorkerProvider & QueueExecutor> extends AbstractWorkerProvider { +public abstract class AbstractLocalAsyncWorkerProvider> extends AbstractWorkerProvider { public abstract int queueSize(); @@ -42,7 +41,10 @@ public abstract class AbstractLocalAsyncWorkerProvider queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorker); - return new LocalAsyncWorkerRef<>(localAsyncWorker, queueEventHandler); + + LocalAsyncWorkerRef localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker); + QueueEventHandler queueEventHandler = queueCreatorService.create(queueSize(), localAsyncWorkerRef); + localAsyncWorkerRef.setQueueEventHandler(queueEventHandler); + return localAsyncWorkerRef; } } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerProviderDefineLoader.java deleted file mode 100644 index 9c90b4ab1dbc3e6ab8c9edff3e006dd3edb302bb..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerProviderDefineLoader.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base; - -import java.util.ArrayList; -import java.util.List; -import org.skywalking.apm.collector.core.define.DefineException; -import org.skywalking.apm.collector.core.define.DefinitionLoader; -import org.skywalking.apm.collector.core.define.Loader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author peng-yongsheng - */ -public class LocalAsyncWorkerProviderDefineLoader implements Loader> { - - private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerProviderDefineLoader.class); - - @Override public List load() throws DefineException { - List providers = new ArrayList<>(); - LocalWorkerProviderDefinitionFile definitionFile = new LocalWorkerProviderDefinitionFile(); - logger.info("local async worker provider definition file name: {}", definitionFile.fileName()); - - DefinitionLoader definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile); - - for (AbstractLocalAsyncWorkerProvider provider : definitionLoader) { - logger.info("loaded local async worker provider definition class: {}", provider.getClass().getName()); - providers.add(provider); - } - return providers; - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java index 7639c8bb5be59b6a402cefa831dc14f6aed1bb2a..28d1aebeb8d8975c58cb0b8be16cff6568a241cd 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/LocalAsyncWorkerRef.java @@ -18,21 +18,30 @@ package org.skywalking.apm.collector.stream.worker.base; +import org.skywalking.apm.collector.core.CollectorException; import org.skywalking.apm.collector.core.graph.NodeProcessor; import org.skywalking.apm.collector.queue.base.QueueEventHandler; +import org.skywalking.apm.collector.queue.base.QueueExecutor; /** * @author peng-yongsheng */ -public class LocalAsyncWorkerRef extends WorkerRef { +public class LocalAsyncWorkerRef extends WorkerRef implements QueueExecutor { - private final QueueEventHandler queueEventHandler; + private QueueEventHandler queueEventHandler; - LocalAsyncWorkerRef(NodeProcessor destinationHandler, QueueEventHandler queueEventHandler) { + LocalAsyncWorkerRef(NodeProcessor destinationHandler) { super(destinationHandler); + } + + public void setQueueEventHandler(QueueEventHandler queueEventHandler) { this.queueEventHandler = queueEventHandler; } + @Override public void execute(INPUT input) throws CollectorException { + out(input); + } + @Override protected void in(INPUT input) { queueEventHandler.tell(input); } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefineLoader.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefineLoader.java deleted file mode 100644 index 52acf6d322af1ee2a3450cf41c881cb22ffe3054..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefineLoader.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base; - -import java.util.ArrayList; -import java.util.List; -import org.skywalking.apm.collector.core.define.DefineException; -import org.skywalking.apm.collector.core.define.DefinitionLoader; -import org.skywalking.apm.collector.core.define.Loader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author peng-yongsheng - */ -public class RemoteWorkerProviderDefineLoader implements Loader> { - - private final Logger logger = LoggerFactory.getLogger(RemoteWorkerProviderDefineLoader.class); - - @Override public List load() throws DefineException { - List providers = new ArrayList<>(); - RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile(); - logger.info("remote worker provider definition file name: {}", definitionFile.fileName()); - - DefinitionLoader definitionLoader = DefinitionLoader.load(AbstractRemoteWorkerProvider.class, definitionFile); - - for (AbstractRemoteWorkerProvider provider : definitionLoader) { - logger.info("loaded remote worker provider definition class: {}", provider.getClass().getName()); - providers.add(provider); - } - return providers; - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefinitionFile.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefinitionFile.java deleted file mode 100644 index ed5ee12f4add4c7059fe2c2b86849081931ec7e2..0000000000000000000000000000000000000000 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/RemoteWorkerProviderDefinitionFile.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.worker.base; - -import org.skywalking.apm.collector.core.define.DefinitionFile; - -/** - * @author peng-yongsheng - */ -public class RemoteWorkerProviderDefinitionFile extends DefinitionFile { - @Override protected String fileName() { - return "remote_worker_provider.define"; - } -} diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/skywalking/apm/collector/ui/service/ServiceTreeService.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/skywalking/apm/collector/ui/service/ServiceTreeService.java index 7aa5f5dd10d18c018fdb8ed98158f8997d815c47..cb2a1ca74c1145d936e42189505d44e8e4fc8703 100644 --- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/skywalking/apm/collector/ui/service/ServiceTreeService.java +++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/skywalking/apm/collector/ui/service/ServiceTreeService.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.Map; import org.skywalking.apm.collector.cache.CacheModule; import org.skywalking.apm.collector.cache.service.ApplicationCacheService; +import org.skywalking.apm.collector.cache.service.ServiceNameCacheService; import org.skywalking.apm.collector.core.module.ModuleManager; import org.skywalking.apm.collector.core.util.ColumnNameUtils; import org.skywalking.apm.collector.core.util.Const; @@ -43,11 +44,13 @@ public class ServiceTreeService { private final IServiceEntryUIDAO serviceEntryDAO; private final IServiceReferenceUIDAO serviceReferenceDAO; private final ApplicationCacheService applicationCacheService; + private final ServiceNameCacheService serviceNameCacheService; public ServiceTreeService(ModuleManager moduleManager) { this.serviceEntryDAO = moduleManager.find(StorageModule.NAME).getService(IServiceEntryUIDAO.class); this.serviceReferenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferenceUIDAO.class); this.applicationCacheService = moduleManager.find(CacheModule.NAME).getService(ApplicationCacheService.class); + this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class); } public JsonObject loadEntryService(int applicationId, String entryServiceName, long startTime, long endTime, @@ -66,6 +69,14 @@ public class ServiceTreeService { public JsonArray loadServiceTree(int entryServiceId, long startTime, long endTime) { Map serviceReferenceMap = serviceReferenceDAO.load(entryServiceId, startTime, endTime); + serviceReferenceMap.values().forEach(serviceReference -> { + int frontServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_ID)).getAsInt(); + int behindServiceId = serviceReference.get(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_ID)).getAsInt(); + String frontServiceName = serviceNameCacheService.getSplitServiceName(serviceNameCacheService.get(frontServiceId)); + String behindServiceName = serviceNameCacheService.getSplitServiceName(serviceNameCacheService.get(behindServiceId)); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_FRONT_SERVICE_NAME), frontServiceName); + serviceReference.addProperty(ColumnNameUtils.INSTANCE.rename(ServiceReferenceTable.COLUMN_BEHIND_SERVICE_NAME), behindServiceName); + }); return buildTreeData(serviceReferenceMap); }