提交 60cf3c83 编写于 作者: P peng-yongsheng

no message

上级 24a03753
......@@ -36,5 +36,15 @@
<artifactId>collector-agent-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -20,15 +20,27 @@ package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.service.DAOService;
/**
* @author peng-yongsheng
*/
public class AgentModuleGRPCProvider extends ModuleProvider {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public String name() {
return "gRPC";
}
......@@ -42,7 +54,21 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
try {
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
DAOService daoService = getManager().find(StorageModule.NAME).getService(DAOService.class);
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.getOrCreateIfAbsent(host, port);
addHandlers(daoService, gRPCServer);
} catch (ModuleNotFoundException e) {
throw new ServiceNotProvidedException(e.getMessage());
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
......@@ -52,4 +78,8 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
@Override public String[] requiredModules() {
return new String[0];
}
private void addHandlers(DAOService daoService, Server gRPCServer) {
gRPCServer.addHandler(new JVMMetricsServiceHandler());
}
}
......@@ -16,32 +16,25 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream.graph;
package org.skywalking.apm.collector.agent.grpc;
import org.junit.Test;
import org.skywalking.apm.collector.storage.table.instance.InstPerformance;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.cluster.ModuleRegistration;
import org.skywalking.apm.collector.core.util.Const;
/**
* @author peng-yongsheng
*/
public class StreamGraphTestCase {
public class AgentModuleGRPCRegistration extends ModuleRegistration {
@Test
public void test() {
StreamGraph graph = new StreamGraph();
graph.addNode(new Aggregator<InstPerformance, Application>() {
@Override public void process(InstPerformance performance, Next<Application> next) {
Application application = new Application("111");
next.execute(application);
}
}).addNext(new Aggregator<Application, InstPerformance>() {
@Override public void process(Application application, Next<InstPerformance> next) {
private final String host;
private final int port;
}
});
public AgentModuleGRPCRegistration(String host, int port) {
this.host = host;
this.port = port;
}
InstPerformance instPerformance = new InstPerformance("111");
graph.start(instPerformance);
@Override public Value buildValue() {
return new Value(host, port, Const.EMPTY_STRING);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.worker.cpu.CpuMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.gc.GCMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.InstHeartBeatPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.heartbeat.define.InstanceHeartBeatDataDefine;
import org.skywalking.apm.collector.agentjvm.worker.memory.MemoryMetricPersistenceWorker;
import org.skywalking.apm.collector.agentjvm.worker.memorypool.MemoryPoolMetricPersistenceWorker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.storage.define.jvm.CpuMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.GCMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryMetricDataDefine;
import org.skywalking.apm.collector.storage.define.jvm.MemoryPoolMetricDataDefine;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
senToInstanceHeartBeatPersistenceWorker(context, instanceId, metric.getTime());
sendToCpuMetricPersistenceWorker(context, instanceId, time, metric.getCpu());
sendToMemoryMetricPersistenceWorker(context, instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricPersistenceWorker(context, instanceId, time, metric.getMemoryPoolList());
sendToGCMetricPersistenceWorker(context, instanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void senToInstanceHeartBeatPersistenceWorker(StreamModuleContext context, int instanceId,
long heartBeatTime) {
InstanceHeartBeatDataDefine.InstanceHeartBeat heartBeat = new InstanceHeartBeatDataDefine.InstanceHeartBeat();
heartBeat.setId(String.valueOf(instanceId));
heartBeat.setHeartBeatTime(TimeBucketUtils.INSTANCE.getSecondTimeBucket(heartBeatTime));
heartBeat.setInstanceId(instanceId);
try {
logger.debug("send to instance heart beat persistence worker, id: {}", heartBeat.getId());
context.getClusterWorkerContext().lookup(InstHeartBeatPersistenceWorker.WorkerRole.INSTANCE).tell(heartBeat.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void sendToCpuMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, CPU cpu) {
CpuMetricDataDefine.CpuMetric cpuMetric = new CpuMetricDataDefine.CpuMetric();
cpuMetric.setId(timeBucket + Const.ID_SPLIT + instanceId);
cpuMetric.setInstanceId(instanceId);
cpuMetric.setUsagePercent(cpu.getUsagePercent());
cpuMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to cpu metric persistence worker, id: {}", cpuMetric.getId());
context.getClusterWorkerContext().lookup(CpuMetricPersistenceWorker.WorkerRole.INSTANCE).tell(cpuMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
private void sendToMemoryMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<Memory> memories) {
memories.forEach(memory -> {
MemoryMetricDataDefine.MemoryMetric memoryMetric = new MemoryMetricDataDefine.MemoryMetric();
memoryMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memory.getIsHeap()));
memoryMetric.setApplicationInstanceId(instanceId);
memoryMetric.setHeap(memory.getIsHeap());
memoryMetric.setInit(memory.getInit());
memoryMetric.setMax(memory.getMax());
memoryMetric.setUsed(memory.getUsed());
memoryMetric.setCommitted(memory.getCommitted());
memoryMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory metric persistence worker, id: {}", memoryMetric.getId());
context.getClusterWorkerContext().lookup(MemoryMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
private void sendToMemoryPoolMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> {
MemoryPoolMetricDataDefine.MemoryPoolMetric memoryPoolMetric = new MemoryPoolMetricDataDefine.MemoryPoolMetric();
memoryPoolMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(memoryPool.getType().getNumber()));
memoryPoolMetric.setInstanceId(instanceId);
memoryPoolMetric.setPoolType(memoryPool.getType().getNumber());
memoryPoolMetric.setInit(memoryPool.getInit());
memoryPoolMetric.setMax(memoryPool.getMax());
memoryPoolMetric.setUsed(memoryPool.getUsed());
memoryPoolMetric.setCommitted(memoryPool.getCommited());
memoryPoolMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to memory pool metric persistence worker, id: {}", memoryPoolMetric.getId());
context.getClusterWorkerContext().lookup(MemoryPoolMetricPersistenceWorker.WorkerRole.INSTANCE).tell(memoryPoolMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
private void sendToGCMetricPersistenceWorker(StreamModuleContext context, int instanceId,
long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> {
GCMetricDataDefine.GCMetric gcMetric = new GCMetricDataDefine.GCMetric();
gcMetric.setId(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(gc.getPhraseValue()));
gcMetric.setInstanceId(instanceId);
gcMetric.setPhrase(gc.getPhraseValue());
gcMetric.setCount(gc.getCount());
gcMetric.setTime(gc.getTime());
gcMetric.setTimeBucket(timeBucket);
try {
logger.debug("send to gc metric persistence worker, id: {}", gcMetric.getId());
context.getClusterWorkerContext().lookup(GCMetricPersistenceWorker.WorkerRole.INSTANCE).tell(gcMetric.toData());
} catch (WorkerInvokeException | WorkerNotFoundException e) {
logger.error(e.getMessage(), e);
}
});
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSegmentServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
// SegmentParse segmentParse = new SegmentParse();
// segmentParse.parse(segment, SegmentParse.Source.Agent);
}
@Override public void onError(Throwable throwable) {
logger.error(throwable.getMessage(), throwable);
}
@Override public void onCompleted() {
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
};
}
}
......@@ -36,5 +36,10 @@
<artifactId>collector-agent-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -28,6 +28,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-stream</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.graph;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
/**
* @author peng-yongsheng
*/
public class GCMetricStreamGraph {
public Graph<GCMetric> createIfAbsent() {
Graph<GCMetric> graph = GraphManager.INSTANCE.createIfAbsent(1, GCMetric.class);
graph.addNode();
return graph;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote;
/**
* @author peng-yongsheng
*/
public enum RoutingRule {
HashCode, ForeverFirst
}
......@@ -18,8 +18,7 @@
package org.skywalking.apm.collector.stream.worker.base;
import org.skywalking.apm.collector.core.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector;
import org.skywalking.apm.collector.remote.RoutingRule;
/**
* @author peng-yongsheng
......@@ -28,7 +27,5 @@ public interface Role {
String roleName();
WorkerSelector workerSelector();
DataDefine dataDefine();
RoutingRule routingRule();
}
......@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.stream.worker.base;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.base.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -31,27 +30,24 @@ public class WorkerRefs<T extends WorkerRef> {
private final Logger logger = LoggerFactory.getLogger(WorkerRefs.class);
private List<T> workerRefs;
private WorkerSelector workerSelector;
private Role role;
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector) {
protected WorkerRefs(List<T> workerRefs) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
}
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector, Role role) {
protected WorkerRefs(List<T> workerRefs, Role role) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
this.role = role;
}
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
// logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerRefs.forEach(workerRef -> {
if (workerRef instanceof RemoteWorkerRef) {
logger.debug("message hashcode: {}, select workers: {}", message.hashCode(), workerRef.toString());
}
});
workerSelector.select(workerRefs, message).tell(message);
// workerSelector.select(workerRefs, message).tell(message);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册