未验证 提交 461672c9 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge branch 'master' into feature/support-start-script

......@@ -21,13 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-grpc-define</artifactId>
<packaging>jar</packaging>
</project>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.grpc;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class StreamModule extends Module {
public class AgentGRPCModule extends Module {
public static final String NAME = "stream";
public static final String NAME = "agent_gRPC";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.agent.AgentModule
\ No newline at end of file
org.skywalking.apm.collector.agent.grpc.AgentGRPCModule
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent-grpc</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-grpc-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
......@@ -19,7 +19,6 @@
package org.skywalking.apm.collector.agent.grpc;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.JVMMetricsServiceHandler;
......@@ -27,8 +26,7 @@ import org.skywalking.apm.collector.agent.grpc.handler.ServiceNameDiscoveryServi
import org.skywalking.apm.collector.agent.grpc.handler.TraceSegmentServiceHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingHandler;
import org.skywalking.apm.collector.agent.grpc.handler.naming.AgentGRPCNamingListener;
import org.skywalking.apm.collector.agent.stream.AgentStreamSingleton;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -39,10 +37,7 @@ import org.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -58,7 +53,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentGRPCModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -70,7 +65,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
moduleRegisterService.register(AgentGRPCModule.NAME, this.name(), new AgentModuleGRPCRegistration(host, port));
AgentGRPCNamingListener namingListener = new AgentGRPCNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -82,7 +77,6 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
Server gRPCServer = managerService.createIfAbsent(host, port);
AgentStreamSingleton.createInstanceIfAbsent(getManager());
addHandlers(gRPCServer);
}
......@@ -91,14 +85,14 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, GRPCManagerModule.NAME, CacheModule.NAME, RemoteModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server gRPCServer) {
gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
gRPCServer.addHandler(new JVMMetricsServiceHandler());
gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
}
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
......@@ -37,10 +38,10 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;
public ApplicationRegisterServiceHandler(ModuleManager moduleManager) {
applicationIDService = new ApplicationIDService(moduleManager);
applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
......
......@@ -21,7 +21,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
......@@ -41,10 +42,10 @@ public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrp
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private final InstanceIDService instanceIDService;
private final IInstanceIDService instanceIDService;
public InstanceDiscoveryServiceHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.CPU;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.GC;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.skywalking.apm.network.proto.Memory;
import org.skywalking.apm.network.proto.MemoryPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);
private final ICpuMetricService cpuMetricService;
private final IGCMetricService gcMetricService;
private final IMemoryMetricService memoryMetricService;
private final IMemoryPoolMetricService memoryPoolMetricService;
private final IInstanceHeartBeatService instanceHeartBeatService;
public JVMMetricsServiceHandler(ModuleManager moduleManager) {
this.cpuMetricService = moduleManager.find(AgentStreamModule.NAME).getService(ICpuMetricService.class);
this.gcMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IGCMetricService.class);
this.memoryMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryMetricService.class);
this.memoryPoolMetricService = moduleManager.find(AgentStreamModule.NAME).getService(IMemoryPoolMetricService.class);
this.instanceHeartBeatService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceHeartBeatService.class);
}
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
int instanceId = request.getApplicationInstanceId();
logger.debug("receive the jvm metric from application instance, id: {}", instanceId);
request.getMetricsList().forEach(metric -> {
long time = TimeBucketUtils.INSTANCE.getSecondTimeBucket(metric.getTime());
sendToInstanceHeartBeatService(instanceId, metric.getTime());
sendToCpuMetricService(instanceId, time, metric.getCpu());
sendToMemoryMetricService(instanceId, time, metric.getMemoryList());
sendToMemoryPoolMetricService(instanceId, time, metric.getMemoryPoolList());
sendToGCMetricService(instanceId, time, metric.getGcList());
});
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
private void sendToInstanceHeartBeatService(int instanceId, long heartBeatTime) {
instanceHeartBeatService.send(instanceId, heartBeatTime);
}
private void sendToMemoryMetricService(int instanceId, long timeBucket, List<Memory> memories) {
memories.forEach(memory -> memoryMetricService.send(instanceId, timeBucket, memory.getIsHeap(), memory.getInit(), memory.getMax(), memory.getUsed(), memory.getCommitted()));
}
private void sendToMemoryPoolMetricService(int instanceId, long timeBucket,
List<MemoryPool> memoryPools) {
memoryPools.forEach(memoryPool -> memoryPoolMetricService.send(instanceId, timeBucket, memoryPool.getType().getNumber(), memoryPool.getInit(), memoryPool.getMax(), memoryPool.getUsed(), memoryPool.getCommited()));
}
private void sendToCpuMetricService(int instanceId, long timeBucket, CPU cpu) {
cpuMetricService.send(instanceId, timeBucket, cpu.getUsagePercent());
}
private void sendToGCMetricService(int instanceId, long timeBucket, List<GC> gcs) {
gcs.forEach(gc -> gcMetricService.send(instanceId, timeBucket, gc.getPhraseValue(), gc.getCount(), gc.getTime()));
}
}
......@@ -20,7 +20,8 @@ package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
......@@ -38,10 +39,10 @@ public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServ
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private final IServiceNameService serviceNameService;
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public void discovery(ServiceNameCollection request,
......
......@@ -19,7 +19,8 @@
package org.skywalking.apm.collector.agent.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
......@@ -35,18 +36,17 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServiceHandler.class);
private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServiceHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
logger.debug("receive segment");
SegmentParse segmentParse = new SegmentParse(moduleManager);
segmentParse.parse(segment, SegmentParse.Source.Agent);
traceSegmentService.send(segment);
}
@Override public void onError(Throwable throwable) {
......
......@@ -37,8 +37,7 @@ public class AgentGRPCNamingHandler extends JettyHandler {
}
@Override public String pathSpec() {
// return "/agent/gRPC";
return "/agentstream/grpc";
return "/agent/gRPC";
}
@Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.grpc.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.grpc.AgentGRPCModule;
import org.skywalking.apm.collector.agent.grpc.AgentModuleGRPCProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentGRPCNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
public static final String PATH = "/" + AgentGRPCModule.NAME + "/" + AgentModuleGRPCProvider.NAME;
@Override public String path() {
return PATH;
......
......@@ -21,25 +21,24 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-stream</artifactId>
<packaging>jar</packaging>
<artifactId>apm-collector-agent-grpc</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-grpc-define</module>
<module>collector-agent-grpc-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-cache-define</artifactId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -21,12 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -16,16 +16,16 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent;
package org.skywalking.apm.collector.agent.jetty;
import org.skywalking.apm.collector.core.module.Module;
/**
* @author peng-yongsheng
*/
public class AgentModule extends Module {
public class AgentJettyModule extends Module {
public static final String NAME = "agent";
public static final String NAME = "agent_jetty";
@Override public String name() {
return NAME;
......
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModule
\ No newline at end of file
org.skywalking.apm.collector.agent.jetty.AgentJettyModule
\ No newline at end of file
......@@ -21,7 +21,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-jetty</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
......@@ -33,17 +33,27 @@
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-jetty-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-jetty-manager-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-jetty-manager-define</artifactId>
<artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-naming-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
......
......@@ -19,14 +19,13 @@
package org.skywalking.apm.collector.agent.jetty;
import java.util.Properties;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.handler.ApplicationRegisterServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.InstanceDiscoveryServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.agent.jetty.handler.TraceSegmentServletHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingHandler;
import org.skywalking.apm.collector.agent.jetty.handler.naming.AgentJettyNamingListener;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.cluster.ClusterModule;
import org.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.skywalking.apm.collector.cluster.service.ModuleRegisterService;
......@@ -38,8 +37,6 @@ import org.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.skywalking.apm.collector.naming.NamingModule;
import org.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.skywalking.apm.collector.server.Server;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.stream.StreamModule;
/**
* @author peng-yongsheng
......@@ -56,7 +53,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public Class<? extends Module> module() {
return AgentModule.class;
return AgentJettyModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
......@@ -69,7 +66,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
String contextPath = config.getProperty(CONTEXT_PATH);
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
moduleRegisterService.register(AgentJettyModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
AgentJettyNamingListener namingListener = new AgentJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -88,7 +85,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
}
@Override public String[] requiredModules() {
return new String[] {ClusterModule.NAME, NamingModule.NAME, StorageModule.NAME, JettyManagerModule.NAME, CacheModule.NAME, StreamModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, JettyManagerModule.NAME, AgentStreamModule.NAME};
}
private void addHandlers(Server jettyServer) {
......
......@@ -24,7 +24,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -38,13 +39,13 @@ public class ApplicationRegisterServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);
private final ApplicationIDService applicationIDService;
private final IApplicationIDService applicationIDService;
private Gson gson = new Gson();
private static final String APPLICATION_CODE = "c";
private static final String APPLICATION_ID = "i";
public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
this.applicationIDService = new ApplicationIDService(moduleManager);
this.applicationIDService = moduleManager.find(AgentStreamModule.NAME).getService(IApplicationIDService.class);
}
@Override public String pathSpec() {
......
......@@ -23,7 +23,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -37,8 +38,8 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);
private final InstanceIDService instanceIDService;
private Gson gson = new Gson();
private final IInstanceIDService instanceIDService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String AGENT_UUID = "au";
......@@ -47,7 +48,7 @@ public class InstanceDiscoveryServletHandler extends JettyHandler {
private static final String OS_INFO = "oi";
public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
this.instanceIDService = new InstanceIDService(moduleManager);
this.instanceIDService = moduleManager.find(AgentStreamModule.NAME).getService(IInstanceIDService.class);
}
@Override public String pathSpec() {
......
......@@ -24,7 +24,8 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -38,8 +39,8 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);
private final ServiceNameService serviceNameService;
private Gson gson = new Gson();
private final IServiceNameService serviceNameService;
private final Gson gson = new Gson();
private static final String APPLICATION_ID = "ai";
private static final String SERVICE_NAME = "sn";
......@@ -47,7 +48,7 @@ public class ServiceNameDiscoveryServiceHandler extends JettyHandler {
private static final String ELEMENT = "el";
public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
this.serviceNameService = new ServiceNameService(moduleManager);
this.serviceNameService = moduleManager.find(AgentStreamModule.NAME).getService(IServiceNameService.class);
}
@Override public String pathSpec() {
......
......@@ -25,7 +25,8 @@ import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegment;
import org.skywalking.apm.collector.agent.jetty.handler.reader.TraceSegmentJsonReader;
import org.skywalking.apm.collector.agent.stream.parser.SegmentParse;
import org.skywalking.apm.collector.agent.stream.AgentStreamModule;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.server.jetty.ArgumentsParseException;
import org.skywalking.apm.collector.server.jetty.JettyHandler;
......@@ -39,10 +40,10 @@ public class TraceSegmentServletHandler extends JettyHandler {
private final Logger logger = LoggerFactory.getLogger(TraceSegmentServletHandler.class);
private final ModuleManager moduleManager;
private final ITraceSegmentService traceSegmentService;
public TraceSegmentServletHandler(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.traceSegmentService = moduleManager.find(AgentStreamModule.NAME).getService(ITraceSegmentService.class);
}
@Override public String pathSpec() {
......@@ -71,9 +72,8 @@ public class TraceSegmentServletHandler extends JettyHandler {
reader.beginArray();
while (reader.hasNext()) {
SegmentParse segmentParse = new SegmentParse(moduleManager);
TraceSegment traceSegment = jsonReader.read(reader);
segmentParse.parse(traceSegment.getUpstreamSegment(), SegmentParse.Source.Agent);
traceSegmentService.send(traceSegment.getUpstreamSegment());
}
reader.endArray();
}
......
......@@ -18,7 +18,7 @@
package org.skywalking.apm.collector.agent.jetty.handler.naming;
import org.skywalking.apm.collector.agent.AgentModule;
import org.skywalking.apm.collector.agent.jetty.AgentJettyModule;
import org.skywalking.apm.collector.agent.jetty.AgentModuleJettyProvider;
import org.skywalking.apm.collector.cluster.ClusterModuleListener;
......@@ -27,7 +27,7 @@ import org.skywalking.apm.collector.cluster.ClusterModuleListener;
*/
public class AgentJettyNamingListener extends ClusterModuleListener {
public static final String PATH = "/" + AgentModule.NAME + "/" + AgentModuleJettyProvider.NAME;
public static final String PATH = "/" + AgentJettyModule.NAME + "/" + AgentModuleJettyProvider.NAME;
@Override public String path() {
return PATH;
......
......@@ -40,8 +40,11 @@ public class SegmentPost {
serviceNameRegisterPost.send("json/servicename-register-provider.json");
JsonElement provider = JsonFileReader.INSTANCE.read("json/dubbox-provider.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
JsonElement consumer = JsonFileReader.INSTANCE.read("json/dubbox-consumer.json");
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
for (int i = 0; i < 1000; i++) {
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", provider.toString());
HttpClientTools.INSTANCE.post("http://localhost:12800/segments", consumer.toString());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2017, OpenSkywalking Organization All rights reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
~ Project repository: https://github.com/OpenSkywalking/skywalking
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agent-jetty</artifactId>
<packaging>pom</packaging>
<modules>
<module>collector-agent-jetty-define</module>
<module>collector-agent-jetty-provider</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -21,20 +21,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-stream</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-stream-provider</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-stream-define</artifactId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -16,37 +16,53 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* @author peng-yongsheng
*/
public class StreamModuleProvider extends ModuleProvider {
public class AgentStreamModule extends Module {
public static final String NAME = "agent_stream";
@Override public String name() {
return "worker";
return NAME;
}
@Override public Class<? extends Module> module() {
return StreamModule.class;
}
@Override public Class[] services() {
List<Class> classes = new ArrayList<>();
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
}
addRegisterService(classes);
addJVMService(classes);
classes.add(ITraceSegmentService.class);
@Override public void start(Properties config) throws ServiceNotProvidedException {
return classes.toArray(new Class[] {});
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
private void addRegisterService(List<Class> classes) {
classes.add(IApplicationIDService.class);
classes.add(IInstanceIDService.class);
classes.add(IServiceNameService.class);
}
@Override public String[] requiredModules() {
return new String[] {};
private void addJVMService(List<Class> classes) {
classes.add(ICpuMetricService.class);
classes.add(IGCMetricService.class);
classes.add(IMemoryMetricService.class);
classes.add(IMemoryPoolMetricService.class);
classes.add(IInstanceHeartBeatService.class);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ICpuMetricService extends Service {
void send(int instanceId, long timeBucket, double usagePercent);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IGCMetricService extends Service {
void send(int instanceId, long timeBucket, int phraseValue, long count, long time);
}
......@@ -16,13 +16,13 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.config.SystemConfig;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public class SegmentBufferConfig {
public static String BUFFER_PATH = SystemConfig.DATA_PATH + "/buffer/";
public interface IInstanceHeartBeatService extends Service {
void send(int instanceId, long heartBeatTime);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryMetricService extends Service {
void send(int instanceId, long timeBucket, boolean isHeap, long init, long max, long used, long commited);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.jvm;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IMemoryPoolMetricService extends Service {
void send(int instanceId, long timeBucket, int poolType, long init, long max, long used, long commited);
}
......@@ -16,13 +16,13 @@
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.remote.service;
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface RemoteServerService extends Service {
void registerReceiver(DataReceiver receiver);
public interface IApplicationIDService extends Service {
int getOrCreate(String applicationCode);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IInstanceIDService extends Service {
int getOrCreate(int applicationId, String agentUUID, long registerTime, String osInfo);
void recover(int instanceId, int applicationId, long registerTime, String osInfo);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.register;
import org.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface IServiceNameService extends Service {
int getOrCreate(int applicationId, String serviceName);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.service.trace;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.network.proto.UpstreamSegment;
/**
* @author peng-yongsheng
*/
public interface ITraceSegmentService extends Service {
void send(UpstreamSegment segment);
}
......@@ -16,4 +16,4 @@
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.stream.StreamModuleProvider
\ No newline at end of file
org.skywalking.apm.collector.agent.stream.AgentStreamModule
\ No newline at end of file
......@@ -21,30 +21,35 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-agent</artifactId>
<artifactId>apm-collector-agent-stream</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2.5-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>collector-agent-grpc-provider</artifactId>
<artifactId>collector-agent-stream-provider</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-define</artifactId>
<artifactId>collector-agent-stream-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-agent-stream</artifactId>
<artifactId>collector-storage-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-grpc-manager-define</artifactId>
<artifactId>collector-cache-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-stream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
......@@ -22,34 +22,23 @@ import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.RegisterStreamGraph;
import org.skywalking.apm.collector.agent.stream.graph.TraceStreamGraph;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.timer.PersistenceTimer;
import org.skywalking.apm.collector.stream.worker.base.WorkerCreateListener;
/**
* @author peng-yongsheng
*/
public class AgentStreamSingleton {
private static AgentStreamSingleton INSTANCE;
public class AgentStreamBootStartup {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
private AgentStreamSingleton(ModuleManager moduleManager) {
public AgentStreamBootStartup(ModuleManager moduleManager) {
this.moduleManager = moduleManager;
this.workerCreateListener = new WorkerCreateListener();
this.create();
}
public static synchronized AgentStreamSingleton createInstanceIfAbsent(ModuleManager moduleManager) {
if (ObjectUtils.isEmpty(INSTANCE)) {
INSTANCE = new AgentStreamSingleton(moduleManager);
}
return INSTANCE;
}
private void create() {
public void start() {
createJVMGraph();
createRegisterGraph();
createTraceGraph();
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream;
import java.util.Properties;
import org.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IInstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryMetricService;
import org.skywalking.apm.collector.agent.stream.service.jvm.IMemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.service.register.IApplicationIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IInstanceIDService;
import org.skywalking.apm.collector.agent.stream.service.register.IServiceNameService;
import org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService;
import org.skywalking.apm.collector.agent.stream.worker.AgentStreamRemoteDataRegister;
import org.skywalking.apm.collector.agent.stream.worker.jvm.CpuMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.GCMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.InstanceHeartBeatService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryMetricService;
import org.skywalking.apm.collector.agent.stream.worker.jvm.MemoryPoolMetricService;
import org.skywalking.apm.collector.agent.stream.worker.register.ApplicationIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.InstanceIDService;
import org.skywalking.apm.collector.agent.stream.worker.register.ServiceNameService;
import org.skywalking.apm.collector.agent.stream.worker.trace.TraceSegmentService;
import org.skywalking.apm.collector.cache.CacheModule;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.remote.RemoteModule;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.StorageModule;
/**
* @author peng-yongsheng
*/
public class AgentStreamModuleProvider extends ModuleProvider {
@Override public String name() {
return "default";
}
@Override public Class<? extends Module> module() {
return AgentStreamModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationIDService.class, new ApplicationIDService(getManager()));
this.registerServiceImplementation(IInstanceIDService.class, new InstanceIDService(getManager()));
this.registerServiceImplementation(IServiceNameService.class, new ServiceNameService(getManager()));
this.registerServiceImplementation(ICpuMetricService.class, new CpuMetricService());
this.registerServiceImplementation(IGCMetricService.class, new GCMetricService());
this.registerServiceImplementation(IMemoryMetricService.class, new MemoryMetricService());
this.registerServiceImplementation(IMemoryPoolMetricService.class, new MemoryPoolMetricService());
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
this.registerServiceImplementation(ITraceSegmentService.class, new TraceSegmentService(getManager()));
BufferFileConfig.Parser parser = new BufferFileConfig.Parser();
parser.parse(config);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
RemoteDataRegisterService remoteDataRegisterService = getManager().find(RemoteModule.NAME).getService(RemoteDataRegisterService.class);
AgentStreamRemoteDataRegister agentStreamRemoteDataRegister = new AgentStreamRemoteDataRegister(remoteDataRegisterService);
agentStreamRemoteDataRegister.register();
AgentStreamBootStartup bootStartup = new AgentStreamBootStartup(getManager());
bootStartup.start();
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[] {StorageModule.NAME, CacheModule.NAME};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.buffer;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class BufferFileConfig {
static int BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
static String BUFFER_PATH = "../buffer/";
private static final String BUFFER_PATH_KEY = "buffer_file_path";
private static final String BUFFER_OFFSET_MAX_FILE_SIZE_KEY = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE_KEY = "buffer_segment_max_file_size";
public static class Parser {
public void parse(Properties config) {
if (config.containsKey(BUFFER_PATH_KEY)) {
BUFFER_PATH = config.getProperty(BUFFER_PATH_KEY);
}
if (config.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_OFFSET_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
} else {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
if (config.containsKey(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY).toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("KB")) {
int size = Integer.parseInt(sizeStr.replace("KB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
} else if (sizeStr.endsWith("M")) {
int size = Integer.parseInt(sizeStr.replace("M", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else if (sizeStr.endsWith("MB")) {
int size = Integer.parseInt(sizeStr.replace("MB", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024 * 1024;
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
} else {
BUFFER_SEGMENT_MAX_FILE_SIZE = 1024 * 1024;
}
}
}
}
......@@ -49,7 +49,7 @@ public enum OffsetManager {
public synchronized void initialize() throws IOException {
if (!initialized) {
this.offset = new Offset();
File dataPath = new File(SegmentBufferConfig.BUFFER_PATH);
File dataPath = new File(BufferFileConfig.BUFFER_PATH);
if (dataPath.mkdirs()) {
createOffsetFile();
} else {
......@@ -70,14 +70,14 @@ public enum OffsetManager {
offset.deserialize(offsetRecord);
initialized = true;
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> flush(), 10, 3, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 10, 3, TimeUnit.SECONDS);
}
}
private void createOffsetFile() throws IOException {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
offsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
offsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
this.offset.getWriteOffset().setWriteFileName(Const.EMPTY_STRING);
this.offset.getWriteOffset().setWriteFileOffset(0);
this.offset.getReadOffset().setReadFileName(Const.EMPTY_STRING);
......@@ -99,7 +99,7 @@ public enum OffsetManager {
private void nextFile() {
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String offsetFileName = OFFSET_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File newOffsetFile = new File(SegmentBufferConfig.BUFFER_PATH + offsetFileName);
File newOffsetFile = new File(BufferFileConfig.BUFFER_PATH + offsetFileName);
offsetFile.delete();
offsetFile = newOffsetFile;
this.flush();
......
......@@ -44,14 +44,14 @@ public enum SegmentBufferManager {
logger.info("segment buffer initialize");
try {
OffsetManager.INSTANCE.initialize();
if (new File(SegmentBufferConfig.BUFFER_PATH).mkdirs()) {
if (new File(BufferFileConfig.BUFFER_PATH).mkdirs()) {
newDataFile();
} else {
String writeFileName = OffsetManager.INSTANCE.getWriteFileName();
if (StringUtils.isNotEmpty(writeFileName)) {
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
if (dataFile.exists()) {
outputStream = new FileOutputStream(new File(SegmentBufferConfig.BUFFER_PATH + writeFileName), true);
outputStream = new FileOutputStream(new File(BufferFileConfig.BUFFER_PATH + writeFileName), true);
} else {
newDataFile();
}
......@@ -83,7 +83,7 @@ public enum SegmentBufferManager {
logger.debug("create new segment buffer file");
String timeBucket = String.valueOf(TimeBucketUtils.INSTANCE.getSecondTimeBucket(System.currentTimeMillis()));
String writeFileName = DATA_FILE_PREFIX + "_" + timeBucket + "." + Const.FILE_SUFFIX;
File dataFile = new File(SegmentBufferConfig.BUFFER_PATH + writeFileName);
File dataFile = new File(BufferFileConfig.BUFFER_PATH + writeFileName);
dataFile.createNewFile();
OffsetManager.INSTANCE.setWriteOffset(writeFileName, 0);
try {
......@@ -98,6 +98,5 @@ public enum SegmentBufferManager {
}
public synchronized void flush() {
}
}
......@@ -53,7 +53,7 @@ public enum SegmentBufferReader {
private void preRead() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
if (StringUtils.isNotEmpty(readFileName)) {
File readFile = new File(SegmentBufferConfig.BUFFER_PATH + readFileName);
File readFile = new File(BufferFileConfig.BUFFER_PATH + readFileName);
if (readFile.exists()) {
deleteTheDataFilesBeforeReadFile(readFileName);
long readFileOffset = OffsetManager.INSTANCE.getReadFileOffset();
......@@ -69,7 +69,7 @@ public enum SegmentBufferReader {
}
private void deleteTheDataFilesBeforeReadFile(String readFileName) {
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
long readFileCreateTime = getFileCreateTime(readFileName);
for (File dataFile : dataFiles) {
......@@ -90,7 +90,7 @@ public enum SegmentBufferReader {
private void readEarliestCreateDataFile() {
String readFileName = OffsetManager.INSTANCE.getReadFileName();
File[] dataFiles = new File(SegmentBufferConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
File[] dataFiles = new File(BufferFileConfig.BUFFER_PATH).listFiles(new PrefixFileNameFilter());
if (CollectionUtils.isNotEmpty(dataFiles)) {
if (dataFiles[0].getName().equals(readFileName)) {
......
......@@ -163,7 +163,7 @@ public class SegmentParse {
}
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
logger.debug("send to segment buffer write worker, id: {}", id);
logger.debug("push to segment buffer write worker, id: {}", id);
SegmentStandardization standardization = new SegmentStandardization(id);
standardization.setUpstreamSegment(upstreamSegment);
Graph<SegmentStandardization> graph = GraphManager.INSTANCE.createIfAbsent(TraceStreamGraph.SEGMENT_STANDARDIZATION_GRAPH_ID, SegmentStandardization.class);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker;
import org.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.skywalking.apm.collector.storage.table.node.NodeComponent;
import org.skywalking.apm.collector.storage.table.node.NodeMapping;
import org.skywalking.apm.collector.storage.table.noderef.NodeReference;
import org.skywalking.apm.collector.storage.table.register.Application;
import org.skywalking.apm.collector.storage.table.register.Instance;
import org.skywalking.apm.collector.storage.table.register.ServiceName;
import org.skywalking.apm.collector.storage.table.service.ServiceEntry;
import org.skywalking.apm.collector.storage.table.serviceref.ServiceReference;
/**
* @author peng-yongsheng
*/
public class AgentStreamRemoteDataRegister {
private final RemoteDataRegisterService remoteDataRegisterService;
public AgentStreamRemoteDataRegister(RemoteDataRegisterService remoteDataRegisterService) {
this.remoteDataRegisterService = remoteDataRegisterService;
}
public void register() {
remoteDataRegisterService.register(Application.class, Application::new);
remoteDataRegisterService.register(Instance.class, Instance::new);
remoteDataRegisterService.register(ServiceName.class, ServiceName::new);
remoteDataRegisterService.register(NodeComponent.class, NodeComponent::new);
remoteDataRegisterService.register(NodeMapping.class, NodeMapping::new);
remoteDataRegisterService.register(NodeReference.class, NodeReference::new);
remoteDataRegisterService.register(ServiceEntry.class, ServiceEntry::new);
remoteDataRegisterService.register(ServiceReference.class, ServiceReference::new);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.ICpuMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.CpuMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class CpuMetricService implements ICpuMetricService {
private final Logger logger = LoggerFactory.getLogger(CpuMetricService.class);
private Graph<CpuMetric> cpuMetricGraph;
private Graph<CpuMetric> getCpuMetricGraph() {
if (ObjectUtils.isEmpty(cpuMetricGraph)) {
cpuMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.CPU_METRIC_GRAPH_ID, CpuMetric.class);
}
return cpuMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, double usagePercent) {
CpuMetric cpuMetric = new CpuMetric(timeBucket + Const.ID_SPLIT + instanceId);
cpuMetric.setInstanceId(instanceId);
cpuMetric.setUsagePercent(usagePercent);
cpuMetric.setTimeBucket(timeBucket);
logger.debug("push to cpu metric graph, id: {}", cpuMetric.getId());
getCpuMetricGraph().start(cpuMetric);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.jvm;
import org.skywalking.apm.collector.agent.stream.graph.JvmMetricStreamGraph;
import org.skywalking.apm.collector.agent.stream.service.jvm.IGCMetricService;
import org.skywalking.apm.collector.core.graph.Graph;
import org.skywalking.apm.collector.core.graph.GraphManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.storage.table.jvm.GCMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GCMetricService implements IGCMetricService {
private final Logger logger = LoggerFactory.getLogger(GCMetricService.class);
private Graph<GCMetric> gcMetricGraph;
private Graph<GCMetric> getGcMetricGraph() {
if (ObjectUtils.isEmpty(gcMetricGraph)) {
gcMetricGraph = GraphManager.INSTANCE.createIfAbsent(JvmMetricStreamGraph.GC_METRIC_GRAPH_ID, GCMetric.class);
}
return gcMetricGraph;
}
@Override public void send(int instanceId, long timeBucket, int phraseValue, long count, long time) {
GCMetric gcMetric = new GCMetric(timeBucket + Const.ID_SPLIT + instanceId + Const.ID_SPLIT + String.valueOf(phraseValue));
gcMetric.setInstanceId(instanceId);
gcMetric.setPhrase(phraseValue);
gcMetric.setCount(count);
gcMetric.setTime(time);
gcMetric.setTimeBucket(timeBucket);
logger.debug("push to gc metric graph, id: {}", gcMetric.getId());
getGcMetricGraph().start(gcMetric);
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册