From 60cf3c839e894b61147f1e228cc96fcca2ee9ae3 Mon Sep 17 00:00:00 2001 From: peng-yongsheng <8082209@qq.com> Date: Mon, 6 Nov 2017 16:23:54 +0800 Subject: [PATCH] no message --- .../collector-agent-grpc-provider/pom.xml | 10 ++ .../agent/grpc/AgentModuleGRPCProvider.java | 30 ++++ .../grpc/AgentModuleGRPCRegistration.java | 40 +++++ .../handler/JVMMetricsServiceHandler.java | 167 ++++++++++++++++++ .../handler/TraceSegmentServiceHandler.java | 54 ++++++ .../collector-agent-jetty-provider/pom.xml | 5 + .../collector-agent-stream/pom.xml | 9 +- .../stream/graph/GCMetricStreamGraph.java | 35 ++++ .../apm/collector/remote/RoutingRule.java | 26 +++ .../stream/graph/StreamGraphTestCase.java | 47 ----- .../collector/stream/worker/base/Role.java | 7 +- .../stream/worker/base/WorkerRefs.java | 12 +- 12 files changed, 381 insertions(+), 61 deletions(-) create mode 100644 apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCRegistration.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java create mode 100644 apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java create mode 100644 apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RoutingRule.java delete mode 100644 apm-collector/apm-collector-stream/collector-stream-define/src/test/java/org/skywalking/apm/collector/stream/graph/StreamGraphTestCase.java diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/pom.xml b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/pom.xml index 64ca7f9a5..610de903e 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/pom.xml +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/pom.xml @@ -36,5 +36,15 @@ collector-agent-define ${project.version} + + org.skywalking + collector-agent-stream + ${project.version} + + + org.skywalking + collector-grpc-manager-define + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java index 33a06746e..75d5b7ff9 100644 --- a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCProvider.java @@ -20,15 +20,27 @@ package org.skywalking.apm.collector.agent.grpc; import java.util.Properties; import org.skywalking.apm.collector.agent.AgentModule; +import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler; +import org.skywalking.apm.collector.cluster.ClusterModule; +import org.skywalking.apm.collector.cluster.service.ModuleRegisterService; import org.skywalking.apm.collector.core.module.Module; +import org.skywalking.apm.collector.core.module.ModuleNotFoundException; import org.skywalking.apm.collector.core.module.ModuleProvider; import org.skywalking.apm.collector.core.module.ServiceNotProvidedException; +import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule; +import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService; +import org.skywalking.apm.collector.server.Server; +import org.skywalking.apm.collector.storage.StorageModule; +import org.skywalking.apm.collector.storage.service.DAOService; /** * @author peng-yongsheng */ public class AgentModuleGRPCProvider extends ModuleProvider { + private static final String HOST = "host"; + private static final String PORT = "port"; + @Override public String name() { return "gRPC"; } @@ -42,7 +54,21 @@ public class AgentModuleGRPCProvider extends ModuleProvider { } @Override public void start(Properties config) throws ServiceNotProvidedException { + String host = config.getProperty(HOST); + Integer port = (Integer)config.get(PORT); + + try { + ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class); + moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port)); + + DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class); + GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class); + Server gRPCServer = managerService.getOrCreateIfAbsent(host, port); + addHandlers(daoService, gRPCServer); + } catch (ModuleNotFoundException e) { + throw new ServiceNotProvidedException(e.getMessage()); + } } @Override public void notifyAfterCompleted() throws ServiceNotProvidedException { @@ -52,4 +78,8 @@ public class AgentModuleGRPCProvider extends ModuleProvider { @Override public String[] requiredModules() { return new String[0]; } + + private void addHandlers(DAOService daoService, Server gRPCServer) { + gRPCServer.addHandler(new JVMMetricsServiceHandler()); + } } diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCRegistration.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCRegistration.java new file mode 100644 index 000000000..b6cd573d8 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/AgentModuleGRPCRegistration.java @@ -0,0 +1,40 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.grpc; + +import org.skywalking.apm.collector.cluster.ModuleRegistration; +import org.skywalking.apm.collector.core.util.Const; + +/** + * @author peng-yongsheng + */ +public class AgentModuleGRPCRegistration extends ModuleRegistration { + + private final String host; + private final int port; + + public AgentModuleGRPCRegistration(String host, int port) { + this.host = host; + this.port = port; + } + + @Override public Value buildValue() { + return new Value(host, port, Const.EMPTY_STRING); + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java new file mode 100644 index 000000000..927c18ec6 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/JVMMetricsServiceHandler.java @@ -0,0 +1,167 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.grpc.handler; + +import io.grpc.stub.StreamObserver; +import java.util.List; +import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker; +import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker; +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker; +import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine; +import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker; +import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker; +import org.skywalking.apm.collector.core.framework.CollectorContextHelper; +import org.skywalking.apm.collector.core.util.Const; +import org.skywalking.apm.collector.core.util.TimeBucketUtils; +import org.skywalking.apm.collector.server.grpc.GRPCHandler; +import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine; +import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine; +import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine; +import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine; +import org.skywalking.apm.collector.stream.StreamModuleContext; +import org.skywalking.apm.collector.stream.StreamModuleGroupDefine; +import org.skywalking.apm.collector.stream.worker.WorkerInvokeException; +import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException; +import org.skywalking.apm.network.proto.CPU; +import org.skywalking.apm.network.proto.Downstream; +import org.skywalking.apm.network.proto.GC; +import org.skywalking.apm.network.proto.JVMMetrics; +import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc; +import org.skywalking.apm.network.proto.Memory; +import org.skywalking.apm.network.proto.MemoryPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler { + + private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class); + + @Override public void collect(JVMMetrics request, StreamObserver responseObserver) { + int instanceId = request.getApplicationInstanceId(); + logger.debug("receive the jvm metric from application instance, id: {}", instanceId); + + StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME); + request.getMetricsList().forEach(metric -> { + long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime()); + senToInstanceHeartBeatPersistenceWorker(context, instanceId, metric.getTime()); + sendToCpuMetricPersistenceWorker(context, instanceId, time, metric.getCpu()); + sendToMemoryMetricPersistenceWorker(context, instanceId, time, metric.getMemoryList()); + sendToMemoryPoolMetricPersistenceWorker(context, instanceId, time, metric.getMemoryPoolList()); + sendToGCMetricPersistenceWorker(context, instanceId, time, metric.getGcList()); + }); + + responseObserver.onNext(Downstream.newBuilder().build()); + responseObserver.onCompleted(); + } + + private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int instanceId, + long heartBeatTime) { + InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat(); + heartBeat.setId(String.valueOf(instanceId)); + heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime)); + heartBeat.setInstanceId(instanceId); + try { + logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId()); + context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + } + + private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int instanceId, + long timeBucket, CPU cpu) { + CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric(); + cpuMetric.setId(timeBucket + Const.ID_SPLIT + instanceId); + cpuMetric.setInstanceId(instanceId); + cpuMetric.setUsagePercent(cpu.getUsagePercent()); + cpuMetric.setTimeBucket(timeBucket); + try { + logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId()); + context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + } + + private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int instanceId, + long timeBucket, List memories) { + + memories.forEach(memory -> { + MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric(); + memoryMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap())); + memoryMetric.setApplicationInstanceId(instanceId); + memoryMetric.setHeap(memory.getIsHeap()); + memoryMetric.setInit(memory.getInit()); + memoryMetric.setMax(memory.getMax()); + memoryMetric.setUsed(memory.getUsed()); + memoryMetric.setCommitted(memory.getCommitted()); + memoryMetric.setTimeBucket(timeBucket); + try { + logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId()); + context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + }); + } + + private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int instanceId, + long timeBucket, List memoryPools) { + + memoryPools.forEach(memoryPool -> { + MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric(); + memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber())); + memoryPoolMetric.setInstanceId(instanceId); + memoryPoolMetric.setPoolType(memoryPool.getType().getNumber()); + memoryPoolMetric.setInit(memoryPool.getInit()); + memoryPoolMetric.setMax(memoryPool.getMax()); + memoryPoolMetric.setUsed(memoryPool.getUsed()); + memoryPoolMetric.setCommitted(memoryPool.getCommited()); + memoryPoolMetric.setTimeBucket(timeBucket); + try { + logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId()); + context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + }); + } + + private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int instanceId, + long timeBucket, List gcs) { + gcs.forEach(gc -> { + GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric(); + gcMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue())); + gcMetric.setInstanceId(instanceId); + gcMetric.setPhrase(gc.getPhraseValue()); + gcMetric.setCount(gc.getCount()); + gcMetric.setTime(gc.getTime()); + gcMetric.setTimeBucket(timeBucket); + try { + logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId()); + context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData()); + } catch (WorkerInvokeException | WorkerNotFoundException e) { + logger.error(e.getMessage(), e); + } + }); + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java new file mode 100644 index 000000000..66bdcf15b --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-grpc-provider/src/main/java/org/skywalking/apm/collector/agent/grpc/handler/TraceSegmentServiceHandler.java @@ -0,0 +1,54 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.grpc.handler; + +import io.grpc.stub.StreamObserver; +import org.skywalking.apm.collector.server.grpc.GRPCHandler; +import org.skywalking.apm.network.proto.Downstream; +import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc; +import org.skywalking.apm.network.proto.UpstreamSegment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author peng-yongsheng + */ +public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements GRPCHandler { + + private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class); + + @Override public StreamObserver collect(StreamObserver responseObserver) { + return new StreamObserver() { + @Override public void onNext(UpstreamSegment segment) { + logger.debug("receive segment"); +// SegmentParse segmentParse = new SegmentParse(); +// segmentParse.parse(segment, SegmentParse.Source.Agent); + } + + @Override public void onError(Throwable throwable) { + logger.error(throwable.getMessage(), throwable); + } + + @Override public void onCompleted() { + responseObserver.onNext(Downstream.newBuilder().build()); + responseObserver.onCompleted(); + } + }; + } +} diff --git a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/pom.xml b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/pom.xml index 38e3ed262..07cf7ea23 100644 --- a/apm-collector/apm-collector-agent/collector-agent-jetty-provider/pom.xml +++ b/apm-collector/apm-collector-agent/collector-agent-jetty-provider/pom.xml @@ -36,5 +36,10 @@ collector-agent-define ${project.version} + + org.skywalking + collector-agent-stream + ${project.version} + \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml index c25629046..c0228e283 100644 --- a/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml +++ b/apm-collector/apm-collector-agent/collector-agent-stream/pom.xml @@ -28,6 +28,13 @@ 4.0.0 collector-agent-stream + jar - + + + org.skywalking + collector-stream-define + ${project.version} + + \ No newline at end of file diff --git a/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java new file mode 100644 index 000000000..f30a0acc2 --- /dev/null +++ b/apm-collector/apm-collector-agent/collector-agent-stream/src/main/java/org/skywalking/apm/collector/agent/stream/graph/GCMetricStreamGraph.java @@ -0,0 +1,35 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.agent.stream.graph; + +import org.skywalking.apm.collector.core.graph.Graph; +import org.skywalking.apm.collector.core.graph.GraphManager; +import org.skywalking.apm.collector.storage.table.jvm.GCMetric; + +/** + * @author peng-yongsheng + */ +public class GCMetricStreamGraph { + + public Graph createIfAbsent() { + Graph graph = GraphManager.INSTANCE.createIfAbsent(1, GCMetric.class); + graph.addNode(); + return graph; + } +} diff --git a/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RoutingRule.java b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RoutingRule.java new file mode 100644 index 000000000..57f1ac74c --- /dev/null +++ b/apm-collector/apm-collector-remote/collector-remote-define/src/main/java/org/skywalking/apm/collector/remote/RoutingRule.java @@ -0,0 +1,26 @@ +/* + * Copyright 2017, OpenSkywalking Organization All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Project repository: https://github.com/OpenSkywalking/skywalking + */ + +package org.skywalking.apm.collector.remote; + +/** + * @author peng-yongsheng + */ +public enum RoutingRule { + HashCode, ForeverFirst +} diff --git a/apm-collector/apm-collector-stream/collector-stream-define/src/test/java/org/skywalking/apm/collector/stream/graph/StreamGraphTestCase.java b/apm-collector/apm-collector-stream/collector-stream-define/src/test/java/org/skywalking/apm/collector/stream/graph/StreamGraphTestCase.java deleted file mode 100644 index 677647529..000000000 --- a/apm-collector/apm-collector-stream/collector-stream-define/src/test/java/org/skywalking/apm/collector/stream/graph/StreamGraphTestCase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2017, OpenSkywalking Organization All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * Project repository: https://github.com/OpenSkywalking/skywalking - */ - -package org.skywalking.apm.collector.stream.graph; - -import org.junit.Test; -import org.skywalking.apm.collector.storage.table.instance.InstPerformance; -import org.skywalking.apm.collector.storage.table.register.Application; - -/** - * @author peng-yongsheng - */ -public class StreamGraphTestCase { - - @Test - public void test() { - StreamGraph graph = new StreamGraph(); - graph.addNode(new Aggregator() { - @Override public void process(InstPerformance performance, Next next) { - Application application = new Application("111"); - next.execute(application); - } - }).addNext(new Aggregator() { - @Override public void process(Application application, Next next) { - - } - }); - - InstPerformance instPerformance = new InstPerformance("111"); - graph.start(instPerformance); - } -} diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java index 0e7f42c78..59c7e4239 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/Role.java @@ -18,8 +18,7 @@ package org.skywalking.apm.collector.stream.worker.base; -import org.skywalking.apm.collector.core.data.DataDefine; -import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector; +import org.skywalking.apm.collector.remote.RoutingRule; /** * @author peng-yongsheng @@ -28,7 +27,5 @@ public interface Role { String roleName(); - WorkerSelector workerSelector(); - - DataDefine dataDefine(); + RoutingRule routingRule(); } diff --git a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRefs.java b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRefs.java index 6c180212a..4d6fe7ae9 100644 --- a/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRefs.java +++ b/apm-collector/apm-collector-stream/collector-stream-provider/src/main/java/org/skywalking/apm/collector/stream/worker/base/WorkerRefs.java @@ -19,7 +19,6 @@ package org.skywalking.apm.collector.stream.worker.base; import java.util.List; -import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,27 +30,24 @@ public class WorkerRefs { private final Logger logger = LoggerFactory.getLogger(WorkerRefs.class); private List workerRefs; - private WorkerSelector workerSelector; private Role role; - protected WorkerRefs(List workerRefs, WorkerSelector workerSelector) { + protected WorkerRefs(List workerRefs) { this.workerRefs = workerRefs; - this.workerSelector = workerSelector; } - protected WorkerRefs(List workerRefs, WorkerSelector workerSelector, Role role) { + protected WorkerRefs(List workerRefs, Role role) { this.workerRefs = workerRefs; - this.workerSelector = workerSelector; this.role = role; } public void tell(Object message) throws WorkerInvokeException { - logger.debug("WorkerSelector instance of {}", workerSelector.getClass()); +// logger.debug("WorkerSelector instance of {}", workerSelector.getClass()); workerRefs.forEach(workerRef -> { if (workerRef instanceof RemoteWorkerRef) { logger.debug("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString()); } }); - workerSelector.select(workerRefs, message).tell(message); +// workerSelector.select(workerRefs, message).tell(message); } } -- GitLab