提交 f3e85ef6 编写于 作者: P peng-yongsheng

Change apm-collector-stream from collector module to common codes.

Make agent gRPC to be a collector module.
Make agent jetty to be a collector module.
Make agent stream to be a collector module.
上级 8658ca77
......@@ -21,13 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-grpc-define</artifactId>
<packaging>jar</packaging>
</project>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent;
package org.skywalking.apm.collector.agent.grpc;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class AgentModule extends Module {
public class AgentGRPCModule extends Module {
public static final String NAME = "agent";
public static final String NAME = "agent_gRPC";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.agent.AgentModule
\ No newline at end of file
org.skywalking.apm.collector.agent.grpc.AgentGRPCModule
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-grpc-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
......@@ -27,8 +26,7 @@ import org.skywalking.apm.collector.agent.grpc.handler.ServiceNameDiscoveryServi
import org.skywalking.apm.collector.agent.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -39,10 +37,7 @@ import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -58,7 +53,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentGRPCModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -70,7 +65,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
moduleRegisterService.register(AgentGRPCModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
AgentGRPCNamingListener namingListener = new AgentGRPCNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -82,7 +77,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}
......@@ -91,14 +85,14 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server gRPCServer) {
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler());
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
}
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
......@@ -37,10 +38,10 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;
public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
......
......@@ -21,7 +21,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -41,10 +42,10 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final InstanceIDService instanceIDService;
private final IInstanceIDService instanceIDService;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
private final ICpuMetricService cpuMetricService;
private final IGCMetricService gcMetricService;
private final IMemoryMetricService memoryMetricService;
private final IMemoryPoolMetricService memoryPoolMetricService;
private final IInstanceHeartBeatService instanceHeartBeatService;
public JVMMetricsServiceHandler(ModuleManager moduleManager) {
this.cpuMetricService = moduleManager.find(AgentStreamModule.NAME).getService(ICpuMetricService.class);
this.gcMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IGCMetricService.class);
this.memoryMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryMetricService.class);
this.memoryPoolMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryPoolMetricService.class);
this.instanceHeartBeatService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToInstanceHeartBeatService(instanceId, metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void sendToInstanceHeartBeatService(int instanceId, long heartBeatTime) {
instanceHeartBeatService.send(instanceId, heartBeatTime);
}
private void sendToMemoryMetricService(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> memoryMetricService.send(instanceId, timeBucket, memory.getIsHeap(), memory.getInit(), memory.getMax(), memory.getUsed(), memory.getCommitted()));
}
private void sendToMemoryPoolMetricService(int instanceId, long timeBucket,
List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> memoryPoolMetricService.send(instanceId, timeBucket, memoryPool.getType().getNumber(), memoryPool.getInit(), memoryPool.getMax(), memoryPool.getUsed(), memoryPool.getCommited()));
}
private void sendToCpuMetricService(int instanceId, long timeBucket, CPU cpu) {
cpuMetricService.send(instanceId, timeBucket, cpu.getUsagePercent());
}
private void sendToGCMetricService(int instanceId, long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> gcMetricService.send(instanceId, timeBucket, gc.getPhraseValue(), gc.getCount(), gc.getTime()));
}
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
......@@ -38,10 +39,10 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private final IServiceNameService serviceNameService;
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public void discovery(ServiceNameCollection request,
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
......@@ -35,18 +36,17 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse(moduleManager);
segmentParse.parse(segment, SegmentParse.Source.Agent);
traceSegmentService.send(segment);
}
@Override public void onError(Throwable throwable) {
......
......@@ -37,8 +37,7 @@ public class AgentGRPCNamingHandler extends JettyHandler {
}
@Override public String pathSpec() {
// return "/agent/gRPC";
return "/agentstream/grpc";
return "/agent/gRPC";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.grpc.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.AgentGRPCModule;
import org.skywalking.apm.collector.agent.grpc.AgentModuleGRPCProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentGRPCNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
public static final String PATH = "/" + AgentGRPCModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
@Override public String path() {
return PATH;
......
......@@ -21,25 +21,24 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-stream</artifactId>
<packaging>jar</packaging>
<artifactId>apm-collector-agent-grpc</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-grpc-define</module>
<module>collector-agent-grpc-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cache-define</artifactId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -21,12 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.jetty;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class StreamModule extends Module {
public class AgentJettyModule extends Module {
public static final String NAME = "stream";
public static final String NAME = "agent_jetty";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModule
\ No newline at end of file
org.skywalking.apm.collector.agent.jetty.AgentJettyModule
\ No newline at end of file
......@@ -21,7 +21,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
......@@ -33,17 +33,27 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-jetty-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-jetty-manager-define</artifactId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
......@@ -19,14 +19,13 @@
package org.skywalking.apm.collector.agent.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -38,8 +37,6 @@ import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -56,7 +53,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentJettyModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -69,7 +66,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
String contextPath = config.getProperty(CONTEXT_PATH);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
moduleRegisterService.register(AgentJettyModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
AgentJettyNamingListener namingListener = new AgentJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -88,7 +85,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, JettyManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server jettyServer) {
......
......@@ -24,7 +24,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -38,13 +39,13 @@ public class ApplicationRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";
public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
this.applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public String pathSpec() {
......
......@@ -23,7 +23,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -37,8 +38,8 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
private final InstanceIDService instanceIDService;
private Gson gson = new Gson();
private final IInstanceIDService instanceIDService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
......@@ -47,7 +48,7 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
private static final String OS_INFO = "oi";
public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override public String pathSpec() {
......
......@@ -24,7 +24,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -38,8 +39,8 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private Gson gson = new Gson();
private final IServiceNameService serviceNameService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
......@@ -47,7 +48,7 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private static final String ELEMENT = "el";
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public String pathSpec() {
......
......@@ -25,7 +25,8 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -39,10 +40,10 @@ public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServletHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public String pathSpec() {
......@@ -71,9 +72,8 @@ public class TraceSegmentServletHandler extends JettyHandler {
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(moduleManager);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
traceSegmentService.send(traceSegment.getUpstreamSegment());
}
reader.endArray();
}
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.jetty.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.AgentJettyModule;
import org.skywalking.apm.collector.agent.jetty.AgentModuleJettyProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentJettyNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleJettyProvider.NAME;
public static final String PATH = "/" + AgentJettyModule.NAME + "/" + AgentModuleJettyProvider.NAME;
@Override public String path() {
return PATH;
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agent-jetty</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-jetty-define</module>
<module>collector-agent-jetty-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -21,20 +21,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-provider</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -16,37 +16,53 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* @author peng-yongsheng
*/
public class StreamModuleProvider extends ModuleProvider {
public class AgentStreamModule extends Module {
public static final String NAME = "agent_stream";
@Override public String name() {
return "worker";
return NAME;
}
@Override public Class<? extends Module> module() {
return StreamModule.class;
}
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
}
addRegisterService(classes);
addJVMService(classes);
classes.add(ITraceSegmentService.class);
@Override public void start(Properties config) throws ServiceNotProvidedException {
return classes.toArray(new Class[] {});
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
private void addRegisterService(List<Class> classes) {
classes.add(IApplicationIDService.class);
classes.add(IInstanceIDService.class);
classes.add(IServiceNameService.class);
}
@Override public String[] requiredModules() {
return new String[] {};
private void addJVMService(List<Class> classes) {
classes.add(ICpuMetricService.class);
classes.add(IGCMetricService.class);
classes.add(IMemoryMetricService.class);
classes.add(IMemoryPoolMetricService.class);
classes.add(IInstanceHeartBeatService.class);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ICpuMetricService extends Service {
void send(int instanceId, long timeBucket, double usagePercent);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IGCMetricService extends Service {
void send(int instanceId, long timeBucket, int phraseValue, long count, long time);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IInstanceHeartBeatService extends Service {
void send(int instanceId, long heartBeatTime);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryMetricService extends Service {
void send(int instanceId, long timeBucket, boolean isHeap, long init, long max, long used, long commited);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryPoolMetricService extends Service {
void send(int instanceId, long timeBucket, int poolType, long init, long max, long used, long commited);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IApplicationIDService extends Service {
int getOrCreate(String applicationCode);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IInstanceIDService extends Service {
int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo);
void recover(int instanceId, int applicationId, long registerTime, String osInfo);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IServiceNameService extends Service {
int getOrCreate(int applicationId, String serviceName);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.trace;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public interface ITraceSegmentService extends Service {
void send(UpstreamSegment segment);
}
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModuleProvider
\ No newline at end of file
org.skywalking.apm.collector.agent.stream.AgentStreamModule
\ No newline at end of file
......@@ -21,30 +21,35 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<artifactId>collector-agent-stream-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -22,34 +22,23 @@ import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
*/
public class AgentStreamSingleton {
private static AgentStreamSingleton INSTANCE;
public class AgentStreamBootStartup {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
private AgentStreamSingleton(ModuleManager moduleManager) {
public AgentStreamBootStartup(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = new WorkerCreateListener();
this.create();
}
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}
private void create() {
public void start() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import org.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.worker.trace.TraceSegmentService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class AgentStreamModuleProvider extends ModuleProvider {
@Override public String name() {
return "default";
}
@Override public Class<? extends Module> module() {
return AgentStreamModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationIDService.class, new ApplicationIDService(getManager()));
this.registerServiceImplementation(IInstanceIDService.class, new InstanceIDService(getManager()));
this.registerServiceImplementation(IServiceNameService.class, new ServiceNameService(getManager()));
this.registerServiceImplementation(ICpuMetricService.class, new CpuMetricService());
this.registerServiceImplementation(IGCMetricService.class, new GCMetricService());
this.registerServiceImplementation(IMemoryMetricService.class, new MemoryMetricService());
this.registerServiceImplementation(IMemoryPoolMetricService.class, new MemoryPoolMetricService());
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
this.registerServiceImplementation(ITraceSegmentService.class, new TraceSegmentService(getManager()));
BufferFileConfig.Parser parser = new BufferFileConfig.Parser();
parser.parse(config);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
AgentStreamBootStartup bootStartup = new AgentStreamBootStartup(getManager());
bootStartup.start();
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {StorageModule.NAME, CacheModule.NAME};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class BufferFileConfig {
static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
private static final String BUFFER_OFFSET_MAX_FILE_SIZE_KEY = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE_KEY = "buffer_segment_max_file_size";
public static class Parser {
public void parse(Properties config) {
if (config.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_OFFSET_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
if (config.containsKey(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
}
}
}
......@@ -70,7 +70,7 @@ public enum OffsetManager {
offset.deserialize(offsetRecord);
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS);
}
}
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CpuMetricService implements ICpuMetricService {
private final Logger logger = LoggerFactory.getLogger(CpuMetricService.class);
private final Graph<CpuMetric> cpuMetricGraph;
public CpuMetricService() {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class);
}
@Override public void send(int instanceId, long timeBucket, double usagePercent) {
CpuMetric cpuMetric = new CpuMetric(timeBucket + Const.ID_SPLIT + instanceId);
cpuMetric.setInstanceId(instanceId);
cpuMetric.setUsagePercent(usagePercent);
cpuMetric.setTimeBucket(timeBucket);
logger.debug("send to cpu metric graph, id: {}", cpuMetric.getId());
cpuMetricGraph.start(cpuMetric);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GCMetricService implements IGCMetricService {
private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
private final Graph<GCMetric> gcMetricGraph;
public GCMetricService() {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class);
}
@Override public void send(int instanceId, long timeBucket, int phraseValue, long count, long time) {
GCMetric gcMetric = new GCMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(phraseValue));
gcMetric.setInstanceId(instanceId);
gcMetric.setPhrase(phraseValue);
gcMetric.setCount(count);
gcMetric.setTime(time);
gcMetric.setTimeBucket(timeBucket);
logger.debug("send to gc metric graph, id: {}", gcMetric.getId());
gcMetricGraph.start(gcMetric);
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册